From d33b3637a569e94676d0251bc5a4ba521c630a3e Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Thu, 9 Apr 2026 09:05:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E6=80=81=E4=BA=8B=E4=BB=B6=E6=8E=A8?= =?UTF-8?q?=E9=80=81=E7=94=B1MQTT=E5=88=87=E6=8D=A2=E6=88=90Websocket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pqs-event/event-common/pom.xml | 6 + .../service/impl/EventDetailServiceImpl.java | 19 ++- .../common/websocket/WebSocketConfig.java | 41 +++++ .../common/websocket/WebSocketServer.java | 147 ++++++++++++++++++ 4 files changed, 210 insertions(+), 3 deletions(-) create mode 100644 pqs-event/event-common/src/main/java/com/njcn/event/common/websocket/WebSocketConfig.java create mode 100644 pqs-event/event-common/src/main/java/com/njcn/event/common/websocket/WebSocketServer.java diff --git a/pqs-event/event-common/pom.xml b/pqs-event/event-common/pom.xml index c400005d2..0b0a5417c 100644 --- a/pqs-event/event-common/pom.xml +++ b/pqs-event/event-common/pom.xml @@ -36,6 +36,12 @@ compile + + org.springframework.boot + spring-boot-starter-websocket + 2.7.12 + + diff --git a/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java b/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java index 7c2f68217..b11ac42fe 100644 --- a/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java +++ b/pqs-event/event-common/src/main/java/com/njcn/event/common/service/impl/EventDetailServiceImpl.java @@ -7,7 +7,7 @@ import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.github.tocrhz.mqtt.publisher.MqttPublisher; +//import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.advance.api.EventCauseFeignClient; import com.njcn.advance.pojo.dto.EventAnalysisDTO; import com.njcn.common.utils.PubUtils; @@ -18,6 +18,7 @@ import com.njcn.device.pq.pojo.po.DeptLine; import com.njcn.device.pq.pojo.vo.AreaLineInfoVO; import com.njcn.device.pq.pojo.vo.LineDetailDataVO; import com.njcn.event.common.mapper.RmpEventDetailMapper; +import com.njcn.event.common.websocket.WebSocketServer; import com.njcn.event.pojo.vo.SendEventVO; import com.njcn.event.utils.EventUtil; import com.njcn.event.pojo.dto.EventDeatilDTO; @@ -29,7 +30,9 @@ import com.njcn.system.api.DicDataFeignClient; import com.njcn.system.enums.DicDataEnum; import com.njcn.system.pojo.po.DictData; import com.njcn.user.api.DeptFeignClient; +import com.njcn.user.api.UserFeignClient; import com.njcn.user.pojo.po.Dept; +import com.njcn.user.pojo.po.User; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.influxdb.dto.QueryResult; @@ -44,6 +47,7 @@ import java.math.RoundingMode; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; +import java.util.stream.Collectors; /** * @author denghuajun @@ -57,11 +61,14 @@ public class EventDetailServiceImpl extends ServiceImpl deptList = dept.stream().collect(Collectors.toList()); + + List data = userFeignClient.getUserInfoByDeptIds(deptList).getData(); SendEventVO vo = new SendEventVO(); vo.setDeptList(dept); vo.setTime(po.getStartTime()); @@ -317,7 +327,10 @@ public class EventDetailServiceImpl extends ServiceImpl sessions = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap lastHeartbeatTime = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap heartbeatExecutors = new ConcurrentHashMap<>(); + private static final long HEARTBEAT_TIMEOUT = 60; // 60秒超时 + + @OnOpen + public void onOpen(Session session, @PathParam("userId") String userId) { + if (StrUtil.isNotBlank(userId)) { + sessions.put(userId, session); + lastHeartbeatTime.put(userId, System.currentTimeMillis()); + sendMessage(session, "连接成功"); + System.out.println("用户 " + userId + " 已连接"); + + // 启动心跳检测 + startHeartbeat(session, userId); + } else { + try { + session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "用户ID不能为空")); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @OnMessage + public void onMessage(String message, Session session, @PathParam("userId") String userId) { + if ("alive".equalsIgnoreCase(message)) { + // 更新最后心跳时间 + lastHeartbeatTime.put(userId, System.currentTimeMillis()); + sendMessage(session, "over"); + } else { + // 处理业务消息 + System.out.println("收到用户 " + userId + " 的消息: " + message); + // TODO: 处理业务逻辑 + } + } + + @OnClose + public void onClose(Session session, CloseReason closeReason, @PathParam("userId") String userId) { + // 移除用户并取消心跳检测 + sessions.remove(userId); + lastHeartbeatTime.remove(userId); + ScheduledExecutorService executor = heartbeatExecutors.remove(userId); + if (executor != null) { + executor.shutdownNow(); + } + System.out.println("用户 " + userId + " 已断开连接,状态码: " + closeReason.getCloseCode()); + } + + @OnError + public void onError(Session session, Throwable throwable, @PathParam("userId") String userId) { + System.out.println("用户 " + userId + " 发生错误: " + throwable.getMessage()); + try { + session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "发生错误")); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void sendMessageToUser(String userId, String message) { + Session session = sessions.get(userId); + if (session != null && session.isOpen()) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage()); + } + } else { + System.out.println("webSocket用户 " + userId + " 不在线或会话已关闭"); + } + } + + private final Object lock = new Object(); + + public void sendMessageToAll(String message) { + sessions.forEach((userId, session) -> { + System.out.println("给用户推送消息" + userId); + if (session.isOpen()) { + synchronized (lock) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage()); + } + } + } + }); + } + + private void sendMessage(Session session, String message) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + System.out.println("发送消息失败: " + e.getMessage()); + } + } + + private void startHeartbeat(Session session, String userId) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + heartbeatExecutors.put(userId, executor); + + // 定期检查心跳 + executor.scheduleAtFixedRate(() -> { + long lastTime = lastHeartbeatTime.getOrDefault(userId, 0L); + long currentTime = System.currentTimeMillis(); + + // 如果超过30秒没有收到心跳 + if (currentTime - lastTime > HEARTBEAT_TIMEOUT * 1000) { + try { + System.out.println("用户 " + userId + " 心跳超时,关闭连接"); + session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时")); + } catch (IOException e) { + System.out.println("关闭用户 " + userId + " 连接时出错: " + e.getMessage()); + } + executor.shutdown(); + heartbeatExecutors.remove(userId); + } + }, 0, 5, TimeUnit.SECONDS); // 每5秒检查一次 + } +} \ No newline at end of file