暂态事件推送由MQTT切换成Websocket
This commit is contained in:
@@ -36,6 +36,12 @@
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
<version>2.7.12</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
//import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.advance.api.EventCauseFeignClient;
|
||||
import com.njcn.advance.pojo.dto.EventAnalysisDTO;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
@@ -18,6 +18,7 @@ import com.njcn.device.pq.pojo.po.DeptLine;
|
||||
import com.njcn.device.pq.pojo.vo.AreaLineInfoVO;
|
||||
import com.njcn.device.pq.pojo.vo.LineDetailDataVO;
|
||||
import com.njcn.event.common.mapper.RmpEventDetailMapper;
|
||||
import com.njcn.event.common.websocket.WebSocketServer;
|
||||
import com.njcn.event.pojo.vo.SendEventVO;
|
||||
import com.njcn.event.utils.EventUtil;
|
||||
import com.njcn.event.pojo.dto.EventDeatilDTO;
|
||||
@@ -29,7 +30,9 @@ import com.njcn.system.api.DicDataFeignClient;
|
||||
import com.njcn.system.enums.DicDataEnum;
|
||||
import com.njcn.system.pojo.po.DictData;
|
||||
import com.njcn.user.api.DeptFeignClient;
|
||||
import com.njcn.user.api.UserFeignClient;
|
||||
import com.njcn.user.pojo.po.Dept;
|
||||
import com.njcn.user.pojo.po.User;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.influxdb.dto.QueryResult;
|
||||
@@ -44,6 +47,7 @@ import java.math.RoundingMode;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author denghuajun
|
||||
@@ -57,11 +61,14 @@ public class EventDetailServiceImpl extends ServiceImpl<RmpEventDetailMapper, Rm
|
||||
|
||||
private final InfluxDbUtils influxDbUtils;
|
||||
private final DicDataFeignClient dicDataFeignClient;
|
||||
private final MqttPublisher publisher;
|
||||
// private final MqttPublisher publisher;
|
||||
|
||||
private final WebSocketServer webSocketServer;
|
||||
private final CommTerminalGeneralClient commTerminalGeneralClient;
|
||||
private final LineFeignClient lineFeignClient;
|
||||
private final DeptLineFeignClient deptLineFeignClient;
|
||||
private final DeptFeignClient deptFeignClient;
|
||||
private final UserFeignClient userFeignClient;
|
||||
|
||||
private final EventCauseFeignClient eventCauseFeignClient;
|
||||
@Override
|
||||
@@ -306,6 +313,9 @@ public class EventDetailServiceImpl extends ServiceImpl<RmpEventDetailMapper, Rm
|
||||
String[] idsArray = deptInfo.getPids().split(",");
|
||||
dept.addAll(Arrays.asList(idsArray));
|
||||
});
|
||||
List<String> deptList = dept.stream().collect(Collectors.toList());
|
||||
|
||||
List<User> data = userFeignClient.getUserInfoByDeptIds(deptList).getData();
|
||||
SendEventVO vo = new SendEventVO();
|
||||
vo.setDeptList(dept);
|
||||
vo.setTime(po.getStartTime());
|
||||
@@ -317,7 +327,10 @@ public class EventDetailServiceImpl extends ServiceImpl<RmpEventDetailMapper, Rm
|
||||
vo.setLineName(lineInfoVOList.get(0).getLineName());
|
||||
vo.setPowerCompany(lineInfoVOList.get(0).getGdName());
|
||||
vo.setSubstation(lineInfoVOList.get(0).getSubName());
|
||||
publisher.send("/sendEvent", PubUtils.obj2json(vo), 1, false);
|
||||
for (User datum : data) {
|
||||
webSocketServer.sendMessageToUser(datum.getId(),PubUtils.obj2json(vo));
|
||||
}
|
||||
// publisher.send("/sendEvent", PubUtils.obj2json(vo), 1, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.njcn.event.common.websocket;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
|
||||
|
||||
/**
|
||||
* Description:
|
||||
* Date: 2024/12/13 15:09【需求编号】
|
||||
*
|
||||
* @author clam
|
||||
* @version V1.0.0
|
||||
*/
|
||||
@Configuration
|
||||
public class WebSocketConfig {
|
||||
|
||||
@Bean
|
||||
public ServerEndpointExporter serverEndpointExporter() {
|
||||
return new ServerEndpointExporter();
|
||||
}
|
||||
|
||||
/**
|
||||
* 通信文本消息和二进制缓存区大小
|
||||
* 避免对接 第三方 报文过大时,Websocket 1009 错误
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
|
||||
@Bean
|
||||
public ServletServerContainerFactoryBean createWebSocketContainer() {
|
||||
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
|
||||
// 在此处设置bufferSize
|
||||
container.setMaxTextMessageBufferSize(10240000);
|
||||
container.setMaxBinaryMessageBufferSize(10240000);
|
||||
container.setMaxSessionIdleTimeout(15 * 60000L);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
package com.njcn.event.common.websocket;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.websocket.*;
|
||||
import javax.websocket.server.PathParam;
|
||||
import javax.websocket.server.ServerEndpoint;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Description:
|
||||
* Date: 2024/12/13 15:11【需求编号】
|
||||
*
|
||||
* @author clam
|
||||
* @version V1.0.0
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ServerEndpoint(value = "/event/{userId}")
|
||||
public class WebSocketServer {
|
||||
|
||||
private static final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<String, Long> lastHeartbeatTime = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<String, ScheduledExecutorService> heartbeatExecutors = new ConcurrentHashMap<>();
|
||||
private static final long HEARTBEAT_TIMEOUT = 60; // 60秒超时
|
||||
|
||||
@OnOpen
|
||||
public void onOpen(Session session, @PathParam("userId") String userId) {
|
||||
if (StrUtil.isNotBlank(userId)) {
|
||||
sessions.put(userId, session);
|
||||
lastHeartbeatTime.put(userId, System.currentTimeMillis());
|
||||
sendMessage(session, "连接成功");
|
||||
System.out.println("用户 " + userId + " 已连接");
|
||||
|
||||
// 启动心跳检测
|
||||
startHeartbeat(session, userId);
|
||||
} else {
|
||||
try {
|
||||
session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "用户ID不能为空"));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OnMessage
|
||||
public void onMessage(String message, Session session, @PathParam("userId") String userId) {
|
||||
if ("alive".equalsIgnoreCase(message)) {
|
||||
// 更新最后心跳时间
|
||||
lastHeartbeatTime.put(userId, System.currentTimeMillis());
|
||||
sendMessage(session, "over");
|
||||
} else {
|
||||
// 处理业务消息
|
||||
System.out.println("收到用户 " + userId + " 的消息: " + message);
|
||||
// TODO: 处理业务逻辑
|
||||
}
|
||||
}
|
||||
|
||||
@OnClose
|
||||
public void onClose(Session session, CloseReason closeReason, @PathParam("userId") String userId) {
|
||||
// 移除用户并取消心跳检测
|
||||
sessions.remove(userId);
|
||||
lastHeartbeatTime.remove(userId);
|
||||
ScheduledExecutorService executor = heartbeatExecutors.remove(userId);
|
||||
if (executor != null) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
System.out.println("用户 " + userId + " 已断开连接,状态码: " + closeReason.getCloseCode());
|
||||
}
|
||||
|
||||
@OnError
|
||||
public void onError(Session session, Throwable throwable, @PathParam("userId") String userId) {
|
||||
System.out.println("用户 " + userId + " 发生错误: " + throwable.getMessage());
|
||||
try {
|
||||
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "发生错误"));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void sendMessageToUser(String userId, String message) {
|
||||
Session session = sessions.get(userId);
|
||||
if (session != null && session.isOpen()) {
|
||||
try {
|
||||
session.getBasicRemote().sendText(message);
|
||||
} catch (IOException e) {
|
||||
System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
|
||||
}
|
||||
} else {
|
||||
System.out.println("webSocket用户 " + userId + " 不在线或会话已关闭");
|
||||
}
|
||||
}
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
public void sendMessageToAll(String message) {
|
||||
sessions.forEach((userId, session) -> {
|
||||
System.out.println("给用户推送消息" + userId);
|
||||
if (session.isOpen()) {
|
||||
synchronized (lock) {
|
||||
try {
|
||||
session.getBasicRemote().sendText(message);
|
||||
} catch (IOException e) {
|
||||
System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void sendMessage(Session session, String message) {
|
||||
try {
|
||||
session.getBasicRemote().sendText(message);
|
||||
} catch (IOException e) {
|
||||
System.out.println("发送消息失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void startHeartbeat(Session session, String userId) {
|
||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
heartbeatExecutors.put(userId, executor);
|
||||
|
||||
// 定期检查心跳
|
||||
executor.scheduleAtFixedRate(() -> {
|
||||
long lastTime = lastHeartbeatTime.getOrDefault(userId, 0L);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
|
||||
// 如果超过30秒没有收到心跳
|
||||
if (currentTime - lastTime > HEARTBEAT_TIMEOUT * 1000) {
|
||||
try {
|
||||
System.out.println("用户 " + userId + " 心跳超时,关闭连接");
|
||||
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
||||
} catch (IOException e) {
|
||||
System.out.println("关闭用户 " + userId + " 连接时出错: " + e.getMessage());
|
||||
}
|
||||
executor.shutdown();
|
||||
heartbeatExecutors.remove(userId);
|
||||
}
|
||||
}, 0, 5, TimeUnit.SECONDS); // 每5秒检查一次
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user