diff --git a/pqs-event/event-common/pom.xml b/pqs-event/event-common/pom.xml
index c400005d2..0b0a5417c 100644
--- a/pqs-event/event-common/pom.xml
+++ b/pqs-event/event-common/pom.xml
@@ -36,6 +36,12 @@
compile
+
+ org.springframework.boot
+ spring-boot-starter-websocket
+ 2.7.12
+
+
diff --git a/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java b/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java
index 7c2f68217..b11ac42fe 100644
--- a/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java
+++ b/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java
@@ -7,7 +7,7 @@ import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.github.tocrhz.mqtt.publisher.MqttPublisher;
+//import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.advance.api.EventCauseFeignClient;
import com.njcn.advance.pojo.dto.EventAnalysisDTO;
import com.njcn.common.utils.PubUtils;
@@ -18,6 +18,7 @@ import com.njcn.device.pq.pojo.po.DeptLine;
import com.njcn.device.pq.pojo.vo.AreaLineInfoVO;
import com.njcn.device.pq.pojo.vo.LineDetailDataVO;
import com.njcn.event.common.mapper.RmpEventDetailMapper;
+import com.njcn.event.common.websocket.WebSocketServer;
import com.njcn.event.pojo.vo.SendEventVO;
import com.njcn.event.utils.EventUtil;
import com.njcn.event.pojo.dto.EventDeatilDTO;
@@ -29,7 +30,9 @@ import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
import com.njcn.user.api.DeptFeignClient;
+import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.Dept;
+import com.njcn.user.pojo.po.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.dto.QueryResult;
@@ -44,6 +47,7 @@ import java.math.RoundingMode;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
+import java.util.stream.Collectors;
/**
* @author denghuajun
@@ -57,11 +61,14 @@ public class EventDetailServiceImpl extends ServiceImpl deptList = dept.stream().collect(Collectors.toList());
+
+ List data = userFeignClient.getUserInfoByDeptIds(deptList).getData();
SendEventVO vo = new SendEventVO();
vo.setDeptList(dept);
vo.setTime(po.getStartTime());
@@ -317,7 +327,10 @@ public class EventDetailServiceImpl extends ServiceImpl sessions = new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap lastHeartbeatTime = new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap heartbeatExecutors = new ConcurrentHashMap<>();
+ private static final long HEARTBEAT_TIMEOUT = 60; // 60秒超时
+
+ @OnOpen
+ public void onOpen(Session session, @PathParam("userId") String userId) {
+ if (StrUtil.isNotBlank(userId)) {
+ sessions.put(userId, session);
+ lastHeartbeatTime.put(userId, System.currentTimeMillis());
+ sendMessage(session, "连接成功");
+ System.out.println("用户 " + userId + " 已连接");
+
+ // 启动心跳检测
+ startHeartbeat(session, userId);
+ } else {
+ try {
+ session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "用户ID不能为空"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @OnMessage
+ public void onMessage(String message, Session session, @PathParam("userId") String userId) {
+ if ("alive".equalsIgnoreCase(message)) {
+ // 更新最后心跳时间
+ lastHeartbeatTime.put(userId, System.currentTimeMillis());
+ sendMessage(session, "over");
+ } else {
+ // 处理业务消息
+ System.out.println("收到用户 " + userId + " 的消息: " + message);
+ // TODO: 处理业务逻辑
+ }
+ }
+
+ @OnClose
+ public void onClose(Session session, CloseReason closeReason, @PathParam("userId") String userId) {
+ // 移除用户并取消心跳检测
+ sessions.remove(userId);
+ lastHeartbeatTime.remove(userId);
+ ScheduledExecutorService executor = heartbeatExecutors.remove(userId);
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ System.out.println("用户 " + userId + " 已断开连接,状态码: " + closeReason.getCloseCode());
+ }
+
+ @OnError
+ public void onError(Session session, Throwable throwable, @PathParam("userId") String userId) {
+ System.out.println("用户 " + userId + " 发生错误: " + throwable.getMessage());
+ try {
+ session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "发生错误"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void sendMessageToUser(String userId, String message) {
+ Session session = sessions.get(userId);
+ if (session != null && session.isOpen()) {
+ try {
+ session.getBasicRemote().sendText(message);
+ } catch (IOException e) {
+ System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
+ }
+ } else {
+ System.out.println("webSocket用户 " + userId + " 不在线或会话已关闭");
+ }
+ }
+
+ private final Object lock = new Object();
+
+ public void sendMessageToAll(String message) {
+ sessions.forEach((userId, session) -> {
+ System.out.println("给用户推送消息" + userId);
+ if (session.isOpen()) {
+ synchronized (lock) {
+ try {
+ session.getBasicRemote().sendText(message);
+ } catch (IOException e) {
+ System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
+ }
+ }
+ }
+ });
+ }
+
+ private void sendMessage(Session session, String message) {
+ try {
+ session.getBasicRemote().sendText(message);
+ } catch (IOException e) {
+ System.out.println("发送消息失败: " + e.getMessage());
+ }
+ }
+
+ private void startHeartbeat(Session session, String userId) {
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ heartbeatExecutors.put(userId, executor);
+
+ // 定期检查心跳
+ executor.scheduleAtFixedRate(() -> {
+ long lastTime = lastHeartbeatTime.getOrDefault(userId, 0L);
+ long currentTime = System.currentTimeMillis();
+
+ // 如果超过30秒没有收到心跳
+ if (currentTime - lastTime > HEARTBEAT_TIMEOUT * 1000) {
+ try {
+ System.out.println("用户 " + userId + " 心跳超时,关闭连接");
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
+ } catch (IOException e) {
+ System.out.println("关闭用户 " + userId + " 连接时出错: " + e.getMessage());
+ }
+ executor.shutdown();
+ heartbeatExecutors.remove(userId);
+ }
+ }, 0, 5, TimeUnit.SECONDS); // 每5秒检查一次
+ }
+}
\ No newline at end of file