From fe5040c7af609e2b1e525ba96f688ad9acdca68b Mon Sep 17 00:00:00 2001 From: hongawen <83944980@qq.com> Date: Mon, 11 Aug 2025 16:31:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/SocketDevResponseService.java | 2 +- .../service/impl/PreDetectionServiceImpl.java | 3 +- .../socket/cilent/NettyDevClientHandler.java | 52 +++++---- .../socket/websocket/WebServiceManager.java | 108 ++++++++++-------- 4 files changed, 94 insertions(+), 71 deletions(-) diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index 8333b485..38ec4156 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -857,7 +857,7 @@ public class SocketDevResponseService { webSocketVO.setData(JSON.toJSONString(map)); webSocketVO.setRequestId(SourceOperateCodeEnum.YJC_XYJY.getValue()); webSocketVO.setOperateCode(SourceOperateCodeEnum.VERIFY_MAPPING$01.getValue()); - webSocketVO.setCode(SourceResponseCodeEnum.SUCCESS.getCode()); + webSocketVO.setCode(SourceResponseCodeEnum.FAIL.getCode()); WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(webSocketVO)); CnSocketUtil.quitSend(param); break; diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 8d2f3b9c..9661371d 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -236,8 +236,9 @@ public class PreDetectionServiceImpl implements PreDetectionService { xuMsg.setData(JSON.toJSONString(sourceIssues)); xuMsg.setRequestId(SourceOperateCodeEnum.FORMAL_REAL.getValue() + "&&" + sourceIssues.getType()); SocketManager.sendMsg(param.getUserPageId() + DetectionCommunicateConstant.SOURCE, JSON.toJSONString(xuMsg)); + // Resume_Success } else { - //TODO 是否最终检测完成需要推送给用户 + //TODO 是否最终检测完成需要推送给用户 检测完成 PqScriptCheckDataParam checkDataParam = new PqScriptCheckDataParam(); checkDataParam.setScriptId(param.getScriptId()); checkDataParam.setIsValueTypeName(false); diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java index 87b9a55f..d61dc2b0 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyDevClientHandler.java @@ -41,7 +41,7 @@ import java.util.stream.Collectors; *
  • 心跳超时处理
  • *
  • 异常处理和恢复
  • * - * + * * @Description: 设备客户端业务处理器 * @Author: wr * @Date: 2024/12/10 14:16 @@ -51,19 +51,29 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class NettyDevClientHandler extends SimpleChannelInboundHandler { - /** 闪变检测超时时间:20分钟(1300秒) */ + /** + * 闪变检测超时时间:20分钟(1300秒) + */ private static final long FLICKER_TIMEOUT = 1300L; - - /** 统计数据检测超时时间:3分钟(180秒) */ + + /** + * 统计数据检测超时时间:3分钟(180秒) + */ private static final long STATISTICS_TIMEOUT = 180L; - - /** 实时数据检测超时时间:1分钟(60秒) */ + + /** + * 实时数据检测超时时间:1分钟(60秒) + */ private static final long REALTIME_TIMEOUT = 60L; - - /** 暂停操作超时时间:10分钟(600秒) */ + + /** + * 暂停操作超时时间:10分钟(600秒) + */ private static final long STOP_TIMEOUT = 600L; - - /** 超时时默认结果标志:3表示超时失败 */ + + /** + * 超时时默认结果标志:3表示超时失败 + */ private static final int DEFAULT_RESULT_FLAG = 3; private final PreDetectionParam param; @@ -81,7 +91,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客户端通道已建立: {}", ctx.channel().id()); - + // 检查是否存在同一用户的老连接 Channel channel = SocketManager.getChannelByUserId(param.getUserPageId() + CnSocketUtil.DEV_TAG); if (Objects.nonNull(channel)) { @@ -117,10 +127,9 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { * * @param ctx 通道上下文 * @param msg 接收到的消息 - * @throws InterruptedException 线程中断异常 */ @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) throws InterruptedException { + protected void channelRead0(ChannelHandlerContext ctx, String msg) { log.info("devhandler接收server端数据: {}", msg); try { socketResponseService.deal(param, msg); @@ -130,7 +139,6 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { } } - /** * 用户事件回调方法,主要用于处理心跳超时 @@ -171,7 +179,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { * 异常捕获处理 *

    捕获并处理各种类型的异常,执行清理工作

    * - * @param ctx 通道上下文 + * @param ctx 通道上下文 * @param cause 异常原因 */ @Override @@ -208,8 +216,6 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { } - - /** * 处理读取超时事件 *

    检查源列表,更新超时计数器,判断是否超时并处理

    @@ -241,7 +247,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { log.warn("当前进入暂停操作超时函数,停止时间: {}", FormalTestManager.stopTime); if (FormalTestManager.stopTime > STOP_TIMEOUT) { CnSocketUtil.quitSend(param); - WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(), SourceOperateCodeEnum.STOP_TIMEOUT); + WebServiceManager.sendDetectionErrorMessage(param.getUserPageId(),SourceOperateCodeEnum.FORMAL_REAL.getValue(), SourceOperateCodeEnum.STOP_TIMEOUT); } } @@ -275,7 +281,7 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { private boolean isTimeout(SourceIssue sourceIssue) { long currentTime = SocketManager.clockMap.get(sourceIssue.getIndex()); String type = sourceIssue.getType(); - + // 根据不同检测类型使用不同的超时阈值 if (DicDataEnum.F.getCode().equals(type)) { // 闪变检测:需要更长时间,20分钟超时 @@ -312,12 +318,12 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { List devListRes = FormalTestManager.devList.stream() .map(this::createTimeoutResult) .collect(Collectors.toList()); - + WebSocketVO> socketVO = new WebSocketVO<>(); socketVO.setRequestId(sourceIssue.getType() + CnSocketUtil.END_TAG); socketVO.setOperateCode(sourceIssue.getType()); socketVO.setData(devListRes); - + WebServiceManager.sendMsg(param.getUserPageId(), JSON.toJSONString(socketVO)); } @@ -332,12 +338,12 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler { DevLineTestResult devLineTestResult = new DevLineTestResult(); devLineTestResult.setDeviceId(dev.getDevId()); devLineTestResult.setDeviceName(dev.getDevName()); - + Integer[] resultFlags = dev.getMonitorList().stream() .map(monitor -> DEFAULT_RESULT_FLAG) .toArray(Integer[]::new); devLineTestResult.setChnResult(resultFlags); - + return devLineTestResult; } diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java index a8133848..dde35b8b 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/websocket/WebServiceManager.java @@ -16,17 +16,17 @@ import java.util.concurrent.ConcurrentHashMap; /** * WebSocket会话管理器 - * + * *

    负责管理电能质量检测系统中的WebSocket连接会话和检测参数,主要功能包括:

    *
      *
    • WebSocket连接会话的添加、删除和管理
    • *
    • 向指定用户推送实时消息(文本消息和结构化消息)
    • *
    • 全局检测参数的存储和管理
    • *
    - * + * *

    线程安全性:

    * 使用ConcurrentHashMap确保在高并发环境下的线程安全。 - * + * *

    使用场景:

    *
      *
    • 检测进度实时推送
    • @@ -34,20 +34,19 @@ import java.util.concurrent.ConcurrentHashMap; *
    • 设备状态变更通知
    • *
    • 异常信息推送
    • *
    - * + * *

    消息推送方式:

    *
      *
    • {@link #sendMsg(String, String)} - 发送纯文本消息
    • *
    • {@link #sendMessage(String, WebSocketVO)} - 发送结构化JSON消息
    • *
    - * + * * @author wr - * @date 2024/12/11 13:04 * @version 1.0 - * @since 检测系统 v2.3.12 - * + * @date 2024/12/11 13:04 * @see com.njcn.gather.detection.util.socket.websocket.WebSocketHandler WebSocket处理器 * @see com.njcn.gather.detection.pojo.vo.WebSocketVO WebSocket消息对象 + * @since 检测系统 v2.3.12 */ @Slf4j public class WebServiceManager { @@ -67,8 +66,8 @@ public class WebServiceManager { /** * 添加用户WebSocket会话 - * - * @param userId 用户ID,不能为null + * + * @param userId 用户ID,不能为null * @param channel WebSocket连接通道,不能为null */ public static void addUser(String userId, Channel channel) { @@ -79,7 +78,7 @@ public class WebServiceManager { /** * 根据用户ID移除会话(推荐使用) * 时间复杂度:O(1) - * + * * @param userId 用户ID * @return 被移除的Channel,如果不存在则返回null */ @@ -90,7 +89,7 @@ public class WebServiceManager { /** * 根据channelId移除会话(兼容老版本) * 时间复杂度:O(n),建议使用removeByUserId替代 - * + * * @param channelId 通道ID * @deprecated 建议使用 {@link #removeByUserId(String)} 替代 */ @@ -110,9 +109,9 @@ public class WebServiceManager { /** * 发送纯文本消息给指定用户 - * + * * @param userId 目标用户ID - * @param msg 要发送的文本消息 + * @param msg 要发送的文本消息 */ public static void sendMsg(String userId, String msg) { Channel channel = userSessions.get(userId); @@ -127,8 +126,8 @@ public class WebServiceManager { /** * 发送结构化消息给指定用户 * 消息会被序列化为JSON格式后发送 - * - * @param userId 目标用户ID + * + * @param userId 目标用户ID * @param webSocketVO 要发送的结构化消息对象 */ public static void sendMessage(String userId, WebSocketVO webSocketVO) { @@ -144,8 +143,8 @@ public class WebServiceManager { /** * 存储检测参数(基于用户ID) * 支持多用户并发检测,每个用户的检测参数独立存储 - * - * @param userId 用户ID(登录名) + * + * @param userId 用户ID(登录名) * @param preDetectionParam 检测参数对象 * @throws IllegalArgumentException 当userId或检测参数为空时抛出 */ @@ -158,22 +157,22 @@ public class WebServiceManager { } preDetectionParamMap.put(userId, preDetectionParam); } - - + + /** * 获取指定用户的检测参数 - * + * * @param userId 用户ID * @return 检测参数对象,如果不存在则返回null */ public static PreDetectionParam getPreDetectionParam(String userId) { return preDetectionParamMap.get(userId); } - + /** * 获取当前检测参数(兼容老版本) * 注意:该方法已废弃,建议使用 {@link #getPreDetectionParam(String)} - * + * * @return 检测参数对象,如果不存在则返回null * @deprecated 多用户并发场景下该方法不安全,请使用 {@link #getPreDetectionParam(String)} */ @@ -185,48 +184,48 @@ public class WebServiceManager { log.warn("存在多个检测参数,无法确定返回哪个,当前参数数量: {}", preDetectionParamMap.size()); return null; } - + /** * 移除指定用户的检测参数 - * + * * @param userId 用户ID * @return 被移除的检测参数,如果不存在则返回null */ public static PreDetectionParam removePreDetectionParam(String userId) { return preDetectionParamMap.remove(userId); } - + /** * 清空所有检测参数 */ public static void removeAllPreDetectionParam() { preDetectionParamMap.clear(); } - + /** * 清空所有检测参数(兼容老版本) - * + * * @deprecated 建议使用 {@link #removeAllPreDetectionParam()} 或 {@link #removePreDetectionParam(String)} */ @Deprecated public static void removePreDetectionParam() { removeAllPreDetectionParam(); } - + // ================================ 实用功能方法 ================================ - + /** * 获取当前在线用户数量 - * + * * @return 在线用户数量 */ public static int getOnlineUserCount() { return userSessions.size(); } - + /** * 检查指定用户是否在线 - * + * * @param userId 用户ID * @return true如果用户在线且连接活跃,否则返回false */ @@ -234,27 +233,27 @@ public class WebServiceManager { Channel channel = userSessions.get(userId); return Objects.nonNull(channel) && channel.isActive(); } - + /** * 获取所有在线用户ID集合 - * + * * @return 在线用户ID集合的快照 */ public static java.util.Set getOnlineUserIds() { return new java.util.HashSet<>(userSessions.keySet()); } - + // ================================ 检测消息推送方法 ================================ - + /** * 发送检测相关消息给指定用户 *

    用于推送检测状态、进度、结果等结构化消息

    - * - * @param userId 目标用户ID - * @param requestId 请求ID,用于标识消息类型和流程 + * + * @param userId 目标用户ID + * @param requestId 请求ID,用于标识消息类型和流程 * @param operateCode 操作代码,标识具体的操作类型 - * @param data 数据载荷,可以是任意类型的数据 - * @param desc 描述信息 + * @param data 数据载荷,可以是任意类型的数据 + * @param desc 描述信息 * @since v2.3.12 重构版本 */ public static void sendDetectionMessage(String userId, String requestId, String operateCode, Object data, String desc) { @@ -269,7 +268,7 @@ public class WebServiceManager { /** * 发送未知错误消息给指定用户 *

    用于处理系统无法识别的操作或未知异常情况

    - * + * * @param userId 目标用户ID * @since v2.3.12 重构版本 */ @@ -280,12 +279,12 @@ public class WebServiceManager { webSocketVO.setOperateCode(SourceOperateCodeEnum.UNKNOWN_OPERATE.getMsg()); sendMessage(userId, webSocketVO); } - + /** * 发送检测错误消息给指定用户 *

    用于推送特定类型的检测错误信息

    - * - * @param userId 目标用户ID + * + * @param userId 目标用户ID * @param errorType 错误类型枚举 * @since v2.3.12 重构版本 */ @@ -296,5 +295,22 @@ public class WebServiceManager { webSocketVO.setOperateCode(errorType.getValue()); sendMessage(userId, webSocketVO); } + + /** + * 发送检测错误消息给指定用户 + *

    用于推送特定类型的检测错误信息

    + * + * @param userId 目标用户ID + * @param requestId 请求ID + * @param errorType 错误类型枚举 + * @since v2.3.12 重构版本 + */ + public static void sendDetectionErrorMessage(String userId, String requestId, SourceOperateCodeEnum errorType) { + WebSocketVO webSocketVO = new WebSocketVO<>(); + webSocketVO.setRequestId(requestId); + webSocketVO.setData(errorType.getMsg()); + webSocketVO.setOperateCode(errorType.getValue()); + sendMessage(userId, webSocketVO); + } } \ No newline at end of file