博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
分布式项目(六)iot-device-data 设备数据监控
阅读量:5827 次
发布时间:2019-06-18

本文共 6314 字,大约阅读时间需要 21 分钟。

hot3.png

书接上回,当设备发送数据后,我们需要在管理页面看到设备发送的数据是什么,所以现在我们就来完成设备数据监控模块

iot-device-data

创建iot-device-data模块,因为此模块也是属于订阅板块的服务,所以它也是消费kakfa中的Mapping数据,引入对应的redis,kakfa模块。

逻辑图

web socket

数据是实时与页面交互,需要用到web socket

org.springframework.boot
spring-boot-starter-websocket
2.1.4.RELEASE

spring boot 对web socket的使用有两种方式,ServerEndpointExporter和TextWebSocketHandler方式,而ServerEndpointExporter,因为webSocket bean不是由spring 容器管理,所以会有注入无法使用的问题,虽然可以解决,但有点不爽;而TextWebSocketHandler,则完全由spring实现,所以不会有注入的问题,因此笔者采用TextWebSocketHandler的方式。

web socket handler

[@Component](https://my.oschina.net/u/3907912)public class DeviceDateHandler extends TextWebSocketHandler {    @Autowired    private DeviceDateService deviceDateService;    /**     * 打开会话     * [@param](https://my.oschina.net/u/2303379) session     */    [@Override](https://my.oschina.net/u/1162528)    public void afterConnectionEstablished(WebSocketSession session) 	throws Exception {        deviceDateService.onOpen(session);    }    /**     * 关闭会话     * [@param](https://my.oschina.net/u/2303379) session     */    @Override    public void afterConnectionClosed(WebSocketSession session, 	CloseStatus status) throws Exception {        deviceDateService.onClose(session);    }    /**     * 异常处理     * @param session     * @param     */    @Override    public void handleTransportError(WebSocketSession session, 	Throwable exception) throws Exception {        deviceDateService.onError(session,exception);    }}

web socket config

@Configuration@EnableWebSocket //开启web socketpublic class WebSocketConfig implements WebSocketConfigurer {    @Autowired    private DeviceDateHandler deviceDateHandler;    @Override    public void registerWebSocketHandlers(WebSocketHandlerRegistry 	registry) {        registry.addHandler(deviceDateHandler, "/ws/device").		setAllowedOrigins("*"); //允许跨域    }}

DeviceDateService web socket建立登录

web socket建立登录时,需要把设备的IMEI和id号一起发送过来,我们在onOpen()方法中对数据进行校验,只有通过校验的链接,才能完成登录。

@Servicepublic class DeviceDateService {    @Autowired    private BaseRedisUtil baseRedisUtil;    private static final String IEMI = "imei";    private static final String DID = "did";    /**     * 打开会话     * @param session     */    public void onOpen(WebSocketSession session) {        Map
map = getMap(session); RedisDeviceVO vo = baseRedisUtil.get(map.get(IEMI)); if (vo == null){ throw new RuntimeException("设备未注册"); } if (vo.getId().equals(map.get(DID))){ throw new RuntimeException("设备号错误"); } WebSocketUtil.put(map.get(DID),session); } /**8 * 关闭会话 * @param session */ public void onClose(WebSocketSession session) throws IOException { Map
map = getMap(session); WebSocketUtil.remove(map.get(DID),session); session.close(); } /** * 异常处理 * @param session * @param error */ public void onError(WebSocketSession session, Throwable error) throws IOException { if (error instanceof RuntimeException){ session.sendMessage(new TextMessage(error.getMessage())); }else { session.sendMessage(new TextMessage("系统异常")); } onClose(session); } private Map
getMap(WebSocketSession session){ String query = session.getUri().getQuery(); if (StringUtils.isEmpty(query)){ throw new RuntimeException("参数错误"); } String[] param = query.split("&"); if (param.length != 2){ throw new RuntimeException("参数错误"); } Map
map = new HashMap<>(); map.put(IEMI,param[0]); map.put(DID,param[1]); return map; }}

web socket 链接缓存

对于已经登录成功的链接,需要缓存session,而一个设备可能同时有好几个web socket同时监控,所以这里按照一对多的形式保存session。session是不同的线程同时在创建和销毁,属于并发操作,所以这里使用并发集合进行处理。

public class WebSocketUtil {    private static final Map
> SESSION_MAP = new HashMap<>(); public static void put(String did,WebSocketSession session){ CopyOnWriteArrayList
list = SESSION_MAP.get(did); if (list == null){ list = new CopyOnWriteArrayList(); SESSION_MAP.put(did,list); } list.add(session); } public static void remove(String did,WebSocketSession session){ CopyOnWriteArrayList
list = SESSION_MAP.get(did); if (CollectionUtils.isEmpty(list)){ return; } int index = -1; Iterator
it = list.iterator(); while (it.hasNext()){ index++; WebSocketSession se = it.next(); if (se == session){ list.remove(index); return; } } if (CollectionUtils.isEmpty(list)){ SESSION_MAP.remove(did); } } public static boolean isEmpty(){ return SESSION_MAP.size() > 0 ? false : true; } public static CopyOnWriteArrayList
findSocketConnect(String did){ return SESSION_MAP.get(did); }}

kafka 监听

订阅kakfa的下行数据(mapping 数据),判断是否有设备链接,如果有就把数据发送给对应的session。

@Componentpublic class DeviceListener {    @KafkaListener(topics = DOWN_TOPIC)    public void listener(String msg){        System.out.println(msg);        if (WebSocketUtil.isEmpty()){            return;        }        KafkaDownVO vo = JSONObject.parseObject(msg,KafkaDownVO.class);        List
socket = WebSocketUtil.findSocketConnect( String.valueOf(vo.getDeviceId())); if (CollectionUtils.isEmpty(socket)){ return; } TextMessage textMessage = new TextMessage(msg); socket.forEach(session -> { try { session.sendMessage(textMessage); } catch (IOException e) { e.printStackTrace(); } }); }}

application.properties

#访问端口server.port=8082#项目路径server.servlet.context-path=/device-data#项目名称spring.application.name=device-dataspring.kafka.consumer.group-id=websocket-device-data

启动项目,创建web socket链接,验证数据是否推送到了页面。

笔者这里使用的是web socket在线测试工具,

结束语

iot-pt,我们已经实现了整个数据的上行,从设备的注入,设备发送数据,Mapping设备数据,pgsql持久化数据,web socket推送实时数据,已经有一个完整的流程了,所以接下来就要使用spring cloud搭建分布式了。

转载于:https://my.oschina.net/u/2258281/blog/3039997

你可能感兴趣的文章
css3 nth-child 与 nth-of-type 的区别
查看>>
BZOJ3110:[ZJOI2013]K大数查询(整体二分)
查看>>
var decode = [+!+[]+[+[]]]+[!+[]+!+[]+!+[]]+[!+[]+!+[]+!+[]+!+[]+!+[]+!+[]]+[+[]];
查看>>
TOJ 1139.Compromise
查看>>
为iOS7重新设计你的App
查看>>
Python Websocket消息推送---GoEasy
查看>>
lnmp或ngnix下codeigniter配置
查看>>
单点登录CAS使用记(六):单点登出、单点注销
查看>>
Android入门(一)
查看>>
swift 拖拽悬浮窗
查看>>
【Unity】4.2 提升开发效率的捷径--导入 Unity 5.3.4 自带的资源包
查看>>
网页内插入百度、谷歌搜索引擎
查看>>
JS生成1000个数字加字母的不重复的随机字符串
查看>>
函数的防抖与节流
查看>>
【Java爬虫】爬取南通大学教务处成绩
查看>>
[ZOJ 1015]Fishing Net(MCS弦图的判定)
查看>>
通过样式改变图片明暗度,不是透明度哦
查看>>
对于右侧文字过多会跑到左侧的问题
查看>>
react 用value 直接赋值 必须用onchange的解决方式
查看>>
Front End中Javascript兼容问题收集(转)
查看>>