This commit is contained in:
wr
2024-12-20 13:28:17 +08:00
parent 5dafcad3fa
commit a81439b1d2
8 changed files with 117 additions and 70 deletions

View File

@@ -21,7 +21,6 @@ import com.njcn.gather.device.device.service.IPqDevService;
import com.njcn.gather.device.script.pojo.param.PqScriptIssueParam;
import com.njcn.gather.device.script.pojo.po.SourceIssue;
import com.njcn.gather.device.script.service.IPqScriptDtlsService;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@@ -36,6 +35,8 @@ public class SocketDevResponseService {
private final String handlerStr = "_Dev";
private final String handlerSourceStr = "_Source";
private final WebSocketHandler webSocketHandler;
private final IPqDevService iPqDevService;
private final IPqScriptDtlsService scriptDtlsService;
@@ -221,7 +222,7 @@ public class SocketDevResponseService {
successXieyi3.add(mId);
System.out.println(successXieyi3.size() + "=====" + monitorIdListComm.size());
if (successXieyi3.size() == monitorIdListComm.size()) {
System.out.println("开始相序校验++++++++++");
System.out.println("开始相序校验,向源下参数++++++++++");
PqScriptIssueParam issueParam = new PqScriptIssueParam();
issueParam.setPlanId(param.getPlanId());
issueParam.setSourceId(param.getSourceId());
@@ -233,7 +234,7 @@ public class SocketDevResponseService {
xuMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue());
xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue());
xuMsg.setData(JSON.toJSONString(sourceIssues.get(0)));
SocketManager.sendMsg(param.getUserPageId() + "_Source", JSON.toJSONString(xuMsg));
SocketManager.sendMsg(param.getUserPageId() + "_Source", MsgUtil.toJsonWithNewLinePlain(xuMsg));
}
}
}
@@ -288,18 +289,21 @@ public class SocketDevResponseService {
info.addAll(devIsSource(dev, sourceIssues.get(0)));
}
}
String s = param.getUserPageId() + "_Source";
String userSource = param.getUserPageId() + handlerSourceStr;
SocketMsg msg = new SocketMsg();
msg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
msg.setOperateCode(SourceOperateCodeEnum.CLOSE_GATHER.getValue());
Map<String, String> map = new HashMap<>(1);
map.put("sourceId", sourceIssues.get(0).getSourceId());
msg.setData(JSON.toJSONString(map));
SocketManager.sendMsg(s, JSON.toJSONString(msg));
SocketManager.sendMsg(userSource, MsgUtil.toJsonWithNewLinePlain(msg));
SocketManager.removeUser(userSource);
SocketManager.removeUser(param.getUserPageId() + handlerStr);
//向前端推送消息
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setData(JSON.toJSONString(info));
socketMsg.setData(info);
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
}
break;
@@ -324,8 +328,13 @@ public class SocketDevResponseService {
private List<SourceCompareDev> devIsSource(DevData dev, SourceIssue issue) {
List<SourceCompareDev> info = new ArrayList<>();
String[] split = dev.getId().split("_");
String devName = null;
if (devNameMapComm.containsKey(split[0])) {
devName = devNameMapComm.get(split[0]);
}
List<SourceIssue.ChannelListDTO> channelList = issue.getChannelList();
List<DevData.SqlDataDTO> sqlData = dev.getSqlData();
@@ -333,10 +342,12 @@ public class SocketDevResponseService {
List<DevData.SqlDataDTO> dataI = sqlData.stream().filter(x -> "电流有效值".equals(x.getDesc())).collect(Collectors.toList());
if (CollUtil.isNotEmpty(dataV)) {
SourceCompareDev compareDev = getSourceCompareDev(split, dataV, "电压有效值", "U", channelList);
compareDev.setDevName(devName);
info.add(compareDev);
}
if (CollUtil.isNotEmpty(dataI)) {
SourceCompareDev compareDev = getSourceCompareDev(split, dataI, "电流有效值", "I", channelList);
compareDev.setDevName(devName);
info.add(compareDev);
}
@@ -360,6 +371,7 @@ public class SocketDevResponseService {
List<SourceIssue.ChannelListDTO> channelList
) {
SourceCompareDev compareDev = new SourceCompareDev();
compareDev.setIp(split[0]);
compareDev.setLineNum(split[1]);
compareDev.setDesc(name);
@@ -413,6 +425,7 @@ public class SocketDevResponseService {
this.successComm = new ArrayList<>();
this.successXieyi = new ArrayList<>();
this.successXieyi3 = new ArrayList<>();
//初始化相序集合
this.devInfo = new ArrayList<>();
this.success = new ArrayList<>();

View File

@@ -15,7 +15,6 @@ import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler;
import com.njcn.gather.detection.util.socket.web.WebSocketHandler;
import com.njcn.gather.device.device.pojo.vo.PreDetection;
import com.njcn.gather.device.device.service.IPqDevService;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -37,12 +36,15 @@ public class SocketSourceResponseService {
private final SocketDevResponseService socketDevResponseService;
private final IPqDevService iPqDevService;
private final String DEV = "_Dev";
@Value("${socket.device.ip}")
private String ip;
@Value("${socket.device.port}")
private Integer port;
public void deal(PreDetectionParam param, String msg) {
SocketDataMsg socketDataMsg = MsgUtil.socketDataMsg(msg);
SourceOperateCodeEnum enumByCode = SourceOperateCodeEnum.getDictDataEnumByCode(socketDataMsg.getRequestId());
@@ -65,20 +67,24 @@ public class SocketSourceResponseService {
/**
* 装置检测(当源初始化成功后,直接向装置通道向装置服务器发送,装置检测)
*
* @param param
* @param socketDataMsg
*/
private void detectionDev(PreDetectionParam param, SocketDataMsg socketDataMsg) {
SourceResponseCodeEnum dictDataEnumByCode = SourceResponseCodeEnum.getDictDataEnumByCode(socketDataMsg.getCode());
if (ObjectUtil.isNotNull(dictDataEnumByCode)) {
SocketMsg socketMsg=new SocketMsg();
SocketMsg socketMsg;
switch (dictDataEnumByCode) {
case SUCCESS:
//todo 前端推送收到的消息暂未处理好
webSocketHandler.sendMsgToUser(param.getUserPageId(), "msg");
String s = param.getUserPageId() + "_Dev";
//开始设备通讯检测(发送设备初始化)
socketMsg = new SocketMsg();
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setData("源通讯校验成功");
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
SocketManager.removeUser(param.getUserPageId()+"_Source");
//开始设备通讯检测(发送设备初始化)
List<PreDetection> devList = iPqDevService.getDevInfo(param.getDevIds());
Map<String, List<PreDetection>> map = new HashMap(1);
map.put("deviceList", devList);
@@ -88,11 +94,11 @@ public class SocketSourceResponseService {
socketMsg.setData(jsonString);
String json = JSON.toJSONString(socketMsg);
NettyClient.socketClient(ip, port, param.getUserPageId(), json, new NettyDevClientHandler(param, socketDevResponseService));
// SocketManager.sendMsg(s,json);
break;
case UNPROCESSED_BUSINESS:
break;
default:
socketMsg = new SocketMsg();
socketMsg.setRequestId(socketDataMsg.getRequestId());
socketMsg.setOperateCode(socketDataMsg.getOperateCode());
socketMsg.setData(dictDataEnumByCode.getMessage());
@@ -104,6 +110,7 @@ public class SocketSourceResponseService {
/**
* 相序检测向装置发送(当装置初始成功后,会向源发送加量请求。收到加量请求成功后会向装置发送参数下发。)
*
* @param param
* @param socketDataMsg
*/
@@ -119,8 +126,7 @@ public class SocketSourceResponseService {
socketMsg.setData(dictDataEnumByCode.getMessage());
webSocketHandler.sendMsgToUser(param.getUserPageId(), JSON.toJSONString(socketMsg));
String s = param.getUserPageId() + "_Dev";
String s = param.getUserPageId() + DEV;
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_XUJY.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.DEV_DATA_REQUEST_02.getValue());
List<PreDetection> pqDevList = iPqDevService.getDevInfo(param.getDevIds());

View File

@@ -1,11 +1,9 @@
package com.njcn.gather.detection.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.gather.detection.handler.SocketDevResponseService;
import com.njcn.gather.detection.handler.SocketSourceResponseService;
@@ -15,9 +13,9 @@ import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.service.PreDetectionService;
import com.njcn.gather.detection.util.socket.MsgUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import com.njcn.gather.detection.util.socket.cilent.NettyClient;
import com.njcn.gather.detection.util.socket.cilent.NettyDevClientHandler;
import com.njcn.gather.detection.util.socket.cilent.NettySourceClientHandler;
import com.njcn.gather.device.device.service.IPqDevService;
import com.njcn.gather.device.script.service.IPqScriptDtlsService;
@@ -34,8 +32,6 @@ import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@@ -102,7 +98,7 @@ public class PreDetectionServiceImpl implements PreDetectionService {
msg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue());
msg.setData(JSON.toJSONString(sourceParam));
param.setSourceId(sourceParam.getSourceId());
NettyClient.socketClient(ip, port, param.getUserPageId(), JSON.toJSONString(msg), new NettySourceClientHandler(param, sourceResponseService));
NettyClient.socketClient(ip, port, param.getUserPageId(), MsgUtil.toJsonWithNewLinePlain(msg), new NettySourceClientHandler(param, sourceResponseService));
} else {
throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT);
}

View File

@@ -1,6 +1,7 @@
package com.njcn.gather.detection.util.socket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.njcn.gather.detection.pojo.vo.SocketDataMsg;
/**
@@ -15,4 +16,24 @@ public class MsgUtil {
public static SocketDataMsg socketDataMsg(String textMsg){
return JSON.parseObject(textMsg,SocketDataMsg.class);
}
/**
* 将对象转换为 JSON 字符串,并在末尾添加换行符
*
* @param obj 需要转换的对象
* @return 包含换行符的 JSON 字符串
*/
public static String toJsonWithNewLine(Object obj) {
return JSON.toJSONString(obj, SerializerFeature.PrettyFormat) + "\n";
}
/**
* 将对象转换为 JSON 字符串,并在末尾添加换行符(不带格式化)
*
* @param obj 需要转换的对象
* @return 包含换行符的 JSON 字符串
*/
public static String toJsonWithNewLinePlain(Object obj) {
return JSON.toJSONString(obj) + "\n";
}
}

View File

@@ -2,7 +2,7 @@ package com.njcn.gather.detection.util.socket;
import cn.hutool.core.util.ObjectUtil;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -14,22 +14,39 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class SocketManager {
private static final Map<String, Channel> userSessions = new ConcurrentHashMap<>();
private static final Map<String, Channel> socketSessions = new ConcurrentHashMap<>();
private static final Map<String, NioEventLoopGroup> socketGroup = new ConcurrentHashMap<>();
public static void addUser(String userId, Channel channel) {
userSessions.put(userId, channel);
socketSessions.put(userId, channel);
}
public static void addGroup(String userId, NioEventLoopGroup group) {
socketGroup.put(userId, group);
}
public static void removeUser(String userId) {
userSessions.remove(userId);
Channel channel = socketSessions.get(userId);
if(ObjectUtil.isNotNull(channel)){
channel.close();
NioEventLoopGroup eventExecutors = socketGroup.get(userId);
if(ObjectUtil.isNotNull(channel)){
eventExecutors.shutdownGracefully();
System.out.println(userId+"__"+channel.id()+"关闭了客户端");
}
}
socketSessions.remove(userId);
}
public static Channel getChannelByUserId(String userId) {
return userSessions.get(userId);
return socketSessions.get(userId);
}
public static NioEventLoopGroup getGroupByUserId(String userId) {
return socketGroup.get(userId);
}
public static void sendMsg(String userId,String msg) {
Channel channel = userSessions.get(userId);
Channel channel = socketSessions.get(userId);
if(ObjectUtil.isNotNull(channel)){
channel.writeAndFlush(msg);
System.out.println(userId+"__"+channel.id()+""+channel.remoteAddress()+"发送数据:"+msg);

View File

@@ -1,9 +1,8 @@
package com.njcn.gather.detection.util.socket.cilent;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.gather.detection.pojo.enums.SourceResponseCodeEnum;
import com.njcn.gather.detection.pojo.vo.WebSocketVO;
import com.njcn.gather.detection.util.socket.SocketManager;
@@ -13,21 +12,12 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.TimeoutException;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ProtocolException;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
/**
@@ -79,6 +69,16 @@ public class NettyClient {
channelFuture.channel().writeAndFlush(msg);
}
});
NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(userPageId + "_Dev");
if(ObjectUtil.isNotNull(groupByUserId)){
groupByUserId.shutdownGracefully();
}else{
if (handler instanceof NettySourceClientHandler) {
SocketManager.addGroup(userPageId+"_Source",group);
}else{
SocketManager.addGroup(userPageId+"_Dev",group);
}
}
} catch (Exception e) {
System.out.println("连接socker服务端发送异常............" + e.getMessage());
group.shutdownGracefully();

View File

@@ -8,7 +8,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.TimeoutException;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import java.io.IOException;
import java.net.ConnectException;
@@ -29,12 +28,6 @@ public class NettyDevClientHandler extends SimpleChannelInboundHandler<String> {
private final SocketDevResponseService socketResponseService;
@Value("${socket.device.ip}")
private String devIp;
@Value("${socket.device.port}")
private Integer devPort;
/**
* 当通道进行连接时推送消息

View File

@@ -1,10 +1,10 @@
package com.njcn.gather.detection.util.socket.cilent;
import com.alibaba.fastjson.JSON;
import com.njcn.gather.detection.handler.SocketSourceResponseService;
import com.njcn.gather.detection.pojo.enums.SourceOperateCodeEnum;
import com.njcn.gather.detection.pojo.param.PreDetectionParam;
import com.njcn.gather.detection.pojo.vo.SocketMsg;
import com.njcn.gather.detection.util.socket.MsgUtil;
import com.njcn.gather.detection.util.socket.SocketManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -34,6 +34,7 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
/**
* 当通道进行连接时推送消息
*
* @param ctx
*/
@Override
@@ -44,7 +45,6 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
channel.close();
}
SocketManager.addUser(webUser.getUserPageId() + "_Source", ctx.channel());
System.out.println("一存储");
}
/**
@@ -60,6 +60,7 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
/**
* 当通道断线时,支持重连
*
* @param ctx
*/
@Override
@@ -88,7 +89,7 @@ public class NettySourceClientHandler extends SimpleChannelInboundHandler<Strin
msg.setRequestId("yxt");
msg.setOperateCode(SourceOperateCodeEnum.HEARTBEAT.getValue());
msg.setData("");
ctx.writeAndFlush(JSON.toJSONString(msg));
ctx.writeAndFlush(MsgUtil.toJsonWithNewLinePlain(msg));
}
} else {
userEventTriggered(ctx, evt);