diff --git a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java index f6794595..0c7cabdd 100644 --- a/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java +++ b/detection/src/main/java/com/njcn/gather/detection/controller/PreDetectionController.java @@ -34,7 +34,8 @@ public class PreDetectionController extends BaseController { private final PreDetectionService preDetectionService; /** - * 开始检测 + * 开始检测通用入口 + * @param param 实体参数 */ @PostMapping("/startPreTest") @OperateInfo diff --git a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java index 51e09ada..20597d1f 100644 --- a/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java +++ b/detection/src/main/java/com/njcn/gather/detection/handler/SocketDevResponseService.java @@ -1134,10 +1134,12 @@ public class SocketDevResponseService { FormalTestManager.realDataXiList.add(devData); successComm.add(devData.getId()); + //成功收到数据后重置超时统计时间 if (SocketManager.clockMap.containsKey(sourceIssue.getIndex())) { SocketManager.clockMap.put(sourceIssue.getIndex(), 0L); } + //当成功的通道数量和检测通道数量相同时进入 if (successComm.size() == FormalTestManager.monitorIdListComm.size()) { System.out.println(sourceIssue.getType() + splitTag + sourceIssue.getIndex() + "当前测试小项读取数据已经全部结束。。。。。。。。。"); //修改装置为监测中 @@ -1149,16 +1151,19 @@ public class SocketDevResponseService { Map textResult = detectionServiceImpl.processing(FormalTestManager.realDataXiList, param, FormalTestManager.devIdMapComm, sourceIssue, dataRule); System.out.println(textResult); - //组装实体推送给前台 + //对检测结果组装实体推送给前台 assWebJson(param, textResult, socketDataMsg, sourceIssue); //当小项结束后需要删除集合中的小项 SocketManager.delSource(sourceIssue.getIndex()); System.out.println("当前小项结束进行删除============" + sourceIssue.getType() + splitTag + sourceIssue.getIndex()); + //小项检测完后小项数减一,并更新map long residueCount = SocketManager.getSourceTarget(sourceIssue.getType()) - 1; SocketManager.addTargetMap(sourceIssue.getType(), residueCount); System.out.println("该大项还有" + residueCount + "个小项没有进行检测!!!!!!!!"); + + //当该大项中小项数量变为0,则任务该大项检测结束 if (residueCount == 0) { System.out.println(sourceIssue.getType() + sourceIssue.getIndex() + "当前测试大项已经全部结束》》》》》》》》"); //当residueCount为0则认为大项中的小项已经全部跑完,开始组装信息推送给前端 @@ -1200,6 +1205,7 @@ public class SocketDevResponseService { CnSocketUtil.sendToWebSocket(param.getUserPageId(), sourceIssues.getType() + stepBegin, null, new ArrayList<>(), null); } + //控源下发下一个小项脚本 SocketMsg xuMsg = new SocketMsg<>(); xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); xuMsg.setData(JSON.toJSONString(sourceIssues)); @@ -1254,42 +1260,6 @@ public class SocketDevResponseService { - -/* - private void processData(List FormalTestManager.realDataXiList, List successComm, SourceIssue sourceIssue, PreDetectionParam param, SocketDataMsg socketDataMsg, Object dataRule, String handlerSourceStr) { - if (SocketManager.clockMap.containsKey(sourceIssue.getIndex())) { - SocketManager.clockMap.put(sourceIssue.getIndex(), 0L); - } - - if (successComm.size() == FormalTestManager.monitorIdListComm.size()) { - System.out.println(sourceIssue.getType() + stepTag + sourceIssue.getIndex() + "当前测试小项读取数据已经全部结束。。。。。。。。。"); - handleSmallItemCompletion(FormalTestManager.realDataXiList, sourceIssue, param, socketDataMsg, dataRule); - if (handlePauseCondition(param, successComm, FormalTestManager.realDataXiList)) { - return; - } - startNextDetection(sourceIssue, param, socketDataMsg, handlerSourceStr); - successComm.clear(); - FormalTestManager.realDataXiList.clear(); - } - } - - private static void handleSmallItemCompletion(List FormalTestManager.realDataXiList, SourceIssue sourceIssue, PreDetectionParam param, SocketDataMsg socketDataMsg, Object dataRule) { - adPlanService.updateTestState(param.getPlanId(), param.getDevIds()); - baseDataInsertService.insert(FormalTestManager.realDataXiList, sourceIssue, param, SocketManager.valueTypeMap); - Map textResult = detectionServiceImpl.processing(FormalTestManager.realDataXiList, param, FormalTestManager.devIdMapComm, sourceIssue, dataRule); - assWebJson(param, textResult, socketDataMsg, sourceIssue); - SocketManager.delSource(sourceIssue.getIndex()); - System.out.println("当前小项结束进行删除============" + sourceIssue.getType() + stepTag + sourceIssue.getIndex()); - long residueCount = SocketManager.getSourceTarget(sourceIssue.getType()) - 1; - SocketManager.addTargetMap(sourceIssue.getType(), residueCount); - System.out.println("该大项还有" + residueCount + "个小项没有进行检测!!!!!!!!"); - if (residueCount == 0) { - handleLargeItemCompletion(sourceIssue, param, socketDataMsg); - } - } -*/ - - /** * 组装实体推送给前端 */ @@ -1312,11 +1282,6 @@ public class SocketDevResponseService { if (targetTestMap.containsKey(sourceIssue.getType())) { List devLineTestResultList = targetTestMap.get(sourceIssue.getType()); -// devListRes.forEach(it1 -> { -// devLineTestResultList.stream().filter(it2 -> it2.getDeviceId().equals(it1.getDeviceId())).findFirst().ifPresent(it2 -> { -// setNewChnResult(it2.getChnResult(), it1.getChnResult()); -// }); -// }); devLineTestResultList.addAll(devListRes); } else { targetTestMap.put(sourceIssue.getType(), devListRes); diff --git a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java index 5292ca90..520820c5 100644 --- a/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/detection/service/impl/PreDetectionServiceImpl.java @@ -78,8 +78,9 @@ public class PreDetectionServiceImpl implements PreDetectionService { @Override public void sourceCommunicationCheck(PreDetectionParam param) { - // 参数校验,目前仅检查IP是否重复 + // 参数校验,目前仅检查IP是否重复,后续可在里面扩展 checkDevIp(param); + //用于处理异常导致的socket通道未关闭,socket交互异常 DetectionCommunicateUtil.checkCommunicateChannel(param); /* 先组装源通讯协议 @@ -141,6 +142,8 @@ public class PreDetectionServiceImpl implements PreDetectionService { param.setErrorSysId(plan.getErrorSysId()); param.setCode(String.valueOf(plan.getCode())); if (ObjectUtil.isNotNull(planSource)) { + + //获取源初始化参数 SourceInitialize sourceParam = pqSourceService.getSourceInitializeParam(planSource.getSourceId()); if (ObjectUtil.isNotNull(sourceParam)) { //开始组装socket报文请求头 @@ -150,6 +153,7 @@ public class PreDetectionServiceImpl implements PreDetectionService { socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue()); socketMsg.setData(JSON.toJSONString(sourceParam)); + //建立与源控程序的socket连接, NettyClient.socketClient(ip, port, param, JSON.toJSONString(socketMsg), new NettySourceClientHandler(param, sourceResponseService)); } else { throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT); diff --git a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java index de88bdb6..2269996a 100644 --- a/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java +++ b/detection/src/main/java/com/njcn/gather/detection/util/socket/cilent/NettyClient.java @@ -49,10 +49,10 @@ public class NettyClient { //空闲状态的handler // 添加LineBasedFrameDecoder来按行分割数据 .addLast(new LineBasedFrameDecoder(10240)) - // .addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS)) + // .addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new HeartbeatHandler(param,source)) + .addLast(new HeartbeatHandler(param, source)) .addLast(handler); } else { ch.pipeline() @@ -61,7 +61,7 @@ public class NettyClient { .addLast(new LineBasedFrameDecoder(10240)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) - .addLast(new HeartbeatHandler(param,dev)) + .addLast(new HeartbeatHandler(param, dev)) //空闲状态的handler .addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)) .addLast(handler); @@ -75,20 +75,25 @@ public class NettyClient { System.out.println("链接服务端失败..."); } else { System.out.println("链接服务端成功..."); - System.out.println("客户端向服务端发送消息:"+port+msg); - channelFuture.channel().writeAndFlush(msg+"\n"); + System.out.println("客户端向服务端发送消息:" + port + msg); + channelFuture.channel().writeAndFlush(msg + "\n"); } }); - NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + "_Dev"); - if(ObjectUtil.isNotNull(groupByUserId)){ - groupByUserId.shutdownGracefully(); - }else{ - if (handler instanceof NettySourceClientHandler) { - SocketManager.addGroup(param.getUserPageId()+source,group); - }else{ - SocketManager.addGroup(param.getUserPageId()+dev,group); + + if (handler instanceof NettySourceClientHandler) { + NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + source); + if(ObjectUtil.isNotNull(groupByUserId)){ + groupByUserId.shutdownGracefully().sync(); } + SocketManager.addGroup(param.getUserPageId() + source, group); + } else { + NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + dev); + if(ObjectUtil.isNotNull(groupByUserId)){ + groupByUserId.shutdownGracefully().sync(); + } + SocketManager.addGroup(param.getUserPageId() + dev, group); } + } catch (Exception e) { System.out.println("连接socket服务端发送异常............" + e.getMessage()); group.shutdownGracefully(); @@ -100,7 +105,7 @@ public class NettyClient { socketDataMsg.setRequestId("connect"); if (handler instanceof NettySourceClientHandler) { socketDataMsg.setOperateCode("Source"); - }else{ + } else { CnSocketUtil.quitSendSource(param); socketDataMsg.setOperateCode("Dev"); } diff --git a/event_smart/.gitignore b/event_smart/.gitignore new file mode 100644 index 00000000..549e00a2 --- /dev/null +++ b/event_smart/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/event_smart/pom.xml b/event_smart/pom.xml new file mode 100644 index 00000000..f36c5849 --- /dev/null +++ b/event_smart/pom.xml @@ -0,0 +1,101 @@ + + + 4.0.0 + + com.njcn.gather + CN_Gather + 1.0.0 + + + event_smart + + + + + com.njcn + njcn-common + 0.0.1 + + + + + + + org.springframework.boot + spring-boot-starter-websocket + 2.7.12 + + + + + com.baomidou + dynamic-datasource-spring-boot-starter + 3.5.1 + + + + + com.njcn + spingboot2.3.12 + 2.3.12 + + + + com.njcn + mybatis-plus + 0.0.1 + + + + + + + + + event_smart + + + org.springframework.boot + spring-boot-maven-plugin + + + package + + repackage + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + + src/main/resources + + **/* + + + + + + diff --git a/event_smart/src/main/java/com/njcn/gather/event/EventSmartApplication.java b/event_smart/src/main/java/com/njcn/gather/event/EventSmartApplication.java new file mode 100644 index 00000000..085b05e2 --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/EventSmartApplication.java @@ -0,0 +1,15 @@ +package com.njcn.gather.event; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@Slf4j +@SpringBootApplication +public class EventSmartApplication { + + public static void main(String[] args) { + SpringApplication.run(EventSmartApplication.class, args); + } + +} diff --git a/event_smart/src/main/java/com/njcn/gather/event/mapper/PqsEventdetailMapper.java b/event_smart/src/main/java/com/njcn/gather/event/mapper/PqsEventdetailMapper.java new file mode 100644 index 00000000..944c860b --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/mapper/PqsEventdetailMapper.java @@ -0,0 +1,9 @@ +package com.njcn.gather.event.mapper; + +import com.github.jeffreyning.mybatisplus.base.MppBaseMapper; +import com.njcn.gather.event.pojo.po.PqsEventdetailPO; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface PqsEventdetailMapper extends MppBaseMapper { +} diff --git a/event_smart/src/main/java/com/njcn/gather/event/mapper/RmpEventDetailMapper.java b/event_smart/src/main/java/com/njcn/gather/event/mapper/RmpEventDetailMapper.java new file mode 100644 index 00000000..ac069863 --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/mapper/RmpEventDetailMapper.java @@ -0,0 +1,10 @@ +package com.njcn.gather.event.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.gather.event.pojo.po.RmpEventDetailPO; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface RmpEventDetailMapper extends BaseMapper { + +} diff --git a/event_smart/src/main/java/com/njcn/gather/event/mq/consumer/EventConsumer.java b/event_smart/src/main/java/com/njcn/gather/event/mq/consumer/EventConsumer.java new file mode 100644 index 00000000..39015a51 --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/mq/consumer/EventConsumer.java @@ -0,0 +1,44 @@ +//package com.njcn.gather.event.mq.consumer; +// +//import com.njcn.message.messagedto.MessageDataDTO; +//import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +//import org.apache.rocketmq.spring.core.RocketMQListener; +//import org.springframework.stereotype.Component; +// +///** +// * @Author: cdf +// * @CreateTime: 2025-06-04 +// * @Description: 暂降消费者 +// */ +//@Component +//@RocketMQMessageListener( +// topic = "CC", +// consumerGroup = "CC", +// selectorExpression = "Test_Tag||Test_Keys", +// consumeThreadNumber = 10, +// enableMsgTrace = true +//) +//@Slf4j +//public class EventConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { +// @Override +// protected void handleMessage(MessageDataDTO message) throws Exception { +// +// } +// +// @Override +// protected boolean isRetry() { +// return false; +// } +// +// @Override +// protected boolean throwException() { +// return false; +// } +// +// @Override +// public void onMessage(String message) { +// +// } +//} diff --git a/event_smart/src/main/java/com/njcn/gather/event/pojo/po/PqsEventdetailPO.java b/event_smart/src/main/java/com/njcn/gather/event/pojo/po/PqsEventdetailPO.java new file mode 100644 index 00000000..61610806 --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/pojo/po/PqsEventdetailPO.java @@ -0,0 +1,94 @@ +package com.njcn.gather.event.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import com.github.jeffreyning.mybatisplus.anno.MppMultiId; +import lombok.Data; + +import java.math.BigDecimal; +import java.util.Date; + +/** + * + * Description: + * 接口文档访问地址:http://serverIP:port/swagger-ui.html + * Date: 2022/12/28 13:46【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +@TableName(value = "PQS_EVENTDETAIL") +public class PqsEventdetailPO { + @MppMultiId(value = "EVENTDETAIL_INDEX") + private String eventdetailIndex; + + @TableField(value = "LINEID") + private BigDecimal lineid; + + @TableField(value = "TIMEID") + private Date timeid; + + @TableField(value = "MS") + private BigDecimal ms; + + @TableField(value = "DESCRIBE") + private String describe; + + @TableField(value = "WAVETYPE") + private Integer wavetype; + + @TableField(value = "PERSISTTIME") + private Double persisttime; + + @TableField(value = "EVENTVALUE") + private Double eventvalue; + + @TableField(value = "EVENTREASON") + private String eventreason; + + @TableField(value = "EVENTTYPE") + private String eventtype; + + @TableField(value = "EVENTASS_INDEX") + private String eventassIndex; + + @TableField(value = "DQTIME") + private Double dqtime; + + @TableField(value = "DEALTIME") + private Date dealtime; + + @TableField(value = "DEALFLAG") + private Integer dealflag; + + @TableField(value = "NUM") + private BigDecimal num; + + @TableField(value = "FILEFLAG") + private Integer fileflag; + + @TableField(value = "FIRSTTIME") + private Date firsttime; + + @TableField(value = "FIRSTTYPE") + private String firsttype; + + @TableField(value = "FIRSTMS") + private BigDecimal firstms; + + @TableField(value = "WAVENAME") + private String wavename; + + @TableField(value = "ENERGY") + private Double energy; + + @TableField(value = "SEVERITY") + private Double severity; + + /** + * 暂降源与监测位置关系Upper:上游;Lower :下游;Unknown :未知;为空则是未计算 + */ + @TableField(value = "SAGSOURCE") + private String sagsource; +} \ No newline at end of file diff --git a/event_smart/src/main/java/com/njcn/gather/event/pojo/po/RmpEventDetailPO.java b/event_smart/src/main/java/com/njcn/gather/event/pojo/po/RmpEventDetailPO.java new file mode 100644 index 00000000..15722426 --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/pojo/po/RmpEventDetailPO.java @@ -0,0 +1,126 @@ +package com.njcn.gather.event.pojo.po; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * 暂降明细实体类 + * + * @author yzh + * @since 2022-10-12 18:34:55 + */ +@Data +@TableName("r_mp_event_detail") +@ApiModel(value="RmpEventDetail对象") +public class RmpEventDetailPO implements Serializable { + private static final long serialVersionUID = 1L; + + @ApiModelProperty(value = "暂时事件ID") + @TableId(value = "event_id", type = IdType.ASSIGN_ID) + private String eventId; + + @ApiModelProperty(value = "监测点ID") + private String measurementPointId; + + @ApiModelProperty(value = "监测点ID(复制)") + @TableField("measurement_point_id") + private String lineId; + + @ApiModelProperty(value = "统计类型") + private String eventType; + + @ApiModelProperty(value = "暂降原因(Event_Reason)") + @TableField("advance_reason") + private String advanceReason; + + @ApiModelProperty(value = "暂降类型(Event_Type)") + @TableField("advance_type") + private String advanceType; + + @ApiModelProperty(value = "事件关联分析表Guid") + private String eventassIndex; + + @ApiModelProperty(value = "dq计算持续时间 ") + private Double dqTime; + + @ApiModelProperty(value = "特征值计算更新时间(外键PQS_Relevance的Time字段)") + private LocalDateTime dealTime; + + @ApiModelProperty(value = "默认事件个数为0") + private Integer num; + + @ApiModelProperty(value = "波形文件是否从装置招到本地(0:未招,1:已招)默认值为0") + private Integer fileFlag; + + @ApiModelProperty(value = "特征值计算标志(0,未处理;1,已处理; 2,已处理,无结果;3,计算失败)默认值为0") + private Integer dealFlag; + + @ApiModelProperty(value = "处理结果第一条事件发生时间(读comtra文件获取)") + private LocalDateTime firstTime; + + @ApiModelProperty(value = "处理结果第一条事件暂降类型(字典表PQS_Dicdata)") + private String firstType; + + @ApiModelProperty(value = "处理结果第一条事件发生时间毫秒(读comtra文件获取)") + private Double firstMs; + + @ApiModelProperty(value = "暂降能量") + private Double energy; + + @ApiModelProperty(value = "暂降严重度") + private Double severity; + + @ApiModelProperty(value = "暂降源与监测位置关系 Upper:上游;Lower :下游;Unknown :未知;为空则是未计算") + private String sagsource; + + @ApiModelProperty(value = "开始时间") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") + private LocalDateTime startTime; + + @ApiModelProperty(value = "格式化开始时间") + @TableField(exist = false) + private String formatTime; + + + @ApiModelProperty(value = "持续时间,单位秒") + private Double duration; + + @ApiModelProperty(value = "特征幅值") + private Double featureAmplitude; + + @ApiModelProperty(value = "相别") + private String phase; + + @ApiModelProperty(value = "事件描述") + private String eventDescribe; + + @ApiModelProperty(value = "波形路径") + private String wavePath; + + @ApiModelProperty(value = "暂降核实原因") + @TableField("verify_reason") + private String verifyReason; + + @ApiModelProperty(value = "暂降核实原因详情") + @TableField("verify_reason_detail") + private String verifyReasonDetail; + + private Double transientValue; + + private LocalDateTime createTime; + + @ApiModelProperty(value = "用于计算数量") + @TableField(exist = false) + private Integer count; + +} + diff --git a/event_smart/src/main/java/com/njcn/gather/event/websocket/WebSocketConfig.java b/event_smart/src/main/java/com/njcn/gather/event/websocket/WebSocketConfig.java new file mode 100644 index 00000000..724e390d --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/websocket/WebSocketConfig.java @@ -0,0 +1,41 @@ +package com.njcn.gather.event.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; +import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean; + +/** + * Description: + * Date: 2024/12/13 15:09【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Configuration +public class WebSocketConfig { + + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } + + /** + * 通信文本消息和二进制缓存区大小 + * 避免对接 第三方 报文过大时,Websocket 1009 错误 + * + * @return + */ + + @Bean + public ServletServerContainerFactoryBean createWebSocketContainer() { + ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); + // 在此处设置bufferSize + container.setMaxTextMessageBufferSize(10240000); + container.setMaxBinaryMessageBufferSize(10240000); + container.setMaxSessionIdleTimeout(15 * 60000L); + return container; + } + + +} diff --git a/event_smart/src/main/java/com/njcn/gather/event/websocket/WebSocketServer.java b/event_smart/src/main/java/com/njcn/gather/event/websocket/WebSocketServer.java new file mode 100644 index 00000000..e04db784 --- /dev/null +++ b/event_smart/src/main/java/com/njcn/gather/event/websocket/WebSocketServer.java @@ -0,0 +1,177 @@ +package com.njcn.gather.event.websocket; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Description: + * Date: 2024/12/13 15:11【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Slf4j +@Component +@ServerEndpoint(value ="/api/pushMessage/{userIdAndLineIdAndDevId}") +public class WebSocketServer { + + /** + * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 + */ + private static int onlineCount = 0; + /** + * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。 + */ + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + /** + * 与某个客户端的连接会话,需要通过它来给客户端发送数据 + */ + private Session session; + /** + * 接收userId + */ + private String userIdAndLineIdAndDevId = ""; + + /** + * 连接建立成 + * 功调用的方法 + */ + @OnOpen + public void onOpen(Session session, @PathParam("userIdAndLineIdAndDevId") String userIdAndLineIdAndDevId) { + this.session = session; + this.userIdAndLineIdAndDevId = userIdAndLineIdAndDevId; + if (webSocketMap.containsKey(userIdAndLineIdAndDevId)) { + webSocketMap.remove(userIdAndLineIdAndDevId); + //加入set中 + webSocketMap.put(userIdAndLineIdAndDevId, this); + } else { + //加入set中 + webSocketMap.put(userIdAndLineIdAndDevId, this); + //在线数加1 + addOnlineCount(); + } + sendMessage("连接成功"); + } + + /** + * 连接关闭 + * 调用的方法 + */ + @OnClose + public void onClose() { + if (webSocketMap.containsKey(userIdAndLineIdAndDevId)) { + webSocketMap.remove(userIdAndLineIdAndDevId); + //从set中删除 + subOnlineCount(); + } + log.info("监测点退出:" + userIdAndLineIdAndDevId + ",当前在线监测点数为:" + getOnlineCount()); + } + + /** + * 收到客户端消 + * 息后调用的方法 + * + * @param message 客户端发送过来的消息 + **/ + @OnMessage + public void onMessage(String message, Session session) { + //会每30s发送请求1次 + log.info("监测点消息:" + userIdAndLineIdAndDevId + ",报文:" + message); + log.info("监测点连接:" + userIdAndLineIdAndDevId + ",当前在线监测点数为:" + getOnlineCount()); + + } + + + /** + * @param session + * @param error + */ + @OnError + public void onError(Session session, Throwable error) { + + log.error("监测点错误:" + this.userIdAndLineIdAndDevId + ",原因:" + error.getMessage()); + error.printStackTrace(); + } + + /** + * 实现服务 + * 器主动推送 + */ + public void sendMessage(String message) { + try { + this.session.getBasicRemote().sendText(message); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 发送自定 + * 义消息 + **/ + public static void sendInfo(String message, String lineId) { + log.info("发送消息到:" + lineId + ",报文:" + message); + if (StringUtils.isNotBlank(lineId)) { + Map stringStringMap = WebSocketServer.filterMapByKey(webSocketMap, lineId); + stringStringMap.forEach((k,v)->{ + webSocketMap.get(k).sendMessage(message); + + }); + } else { + log.error("监测点" + lineId + ",不在线!"); + } + } + + /** + * 获得此时的 + * 在线监测点 + * + * @return + */ + public static synchronized int getOnlineCount() { + return onlineCount; + } + + /** + * 在线监测点 + * 数加1 + */ + public static synchronized void addOnlineCount() { + WebSocketServer.onlineCount++; + } + + /** + * 在线监测点 + * 数减1 + */ + public static synchronized void subOnlineCount() { + WebSocketServer.onlineCount--; + } + + /** + * 过滤所有键包含指定字符串的条目 + * @param map 原始的Map + * @param substring 要检查的子字符串 + * @return 过滤的Map + */ + public static Map filterMapByKey(ConcurrentHashMap map, String substring) { + Map result = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + if (entry.getKey().contains(substring)) { + result.put(entry.getKey(), entry.getValue().toString()); + } + } + return result; + } + +} \ No newline at end of file diff --git a/event_smart/src/main/resources/application.yml b/event_smart/src/main/resources/application.yml new file mode 100644 index 00000000..b3cddaa6 --- /dev/null +++ b/event_smart/src/main/resources/application.yml @@ -0,0 +1,55 @@ +server: + port: 18093 +spring: + application: + name: event_smart + datasource: + dynamic: + primary: master + strict: false # 是否严格匹配数据源,默认false + druid: # 如果使用Druid连接池 + validation-query: SELECT 1 FROM DUAL # 达梦专用校验SQL + initial-size: 10 + # 初始化大小,最小,最大 + min-idle: 20 + maxActive: 500 + # 配置获取连接等待超时的时间 + maxWait: 60000 + # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + timeBetweenEvictionRunsMillis: 60000 + # 配置一个连接在池中最小生存的时间,单位是毫秒 + minEvictableIdleTimeMillis: 300000 + testWhileIdle: true + testOnBorrow: true + testOnReturn: false + # 打开PSCache,并且指定每个连接上PSCache的大小 + poolPreparedStatements: true + maxPoolPreparedStatementPerConnectionSize: 20 + datasource: + master: + driver-class-name: dm.jdbc.driver.DmDriver + url: jdbc:dm://192.168.1.21:5236/PQSADMIN?useUnicode=true&characterEncoding=utf-8 + username: PQSADMINLN + password: Pqsadmin123 + +#mybatis配置信息 +mybatis-plus: + mapper-locations: classpath*:com/njcn/**/mapping/*.xml + #别名扫描 + type-aliases-package: com.njcn.gather.event.pojo + configuration: + #驼峰命名 + map-underscore-to-camel-case: true + #配置sql日志输出 + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + #关闭日志输出 +# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl + global-config: + db-config: + #指定主键生成策略 + id-type: assign_uuid + + + + + diff --git a/event_smart/src/main/resources/logback.xml b/event_smart/src/main/resources/logback.xml new file mode 100644 index 00000000..327bcd9f --- /dev/null +++ b/event_smart/src/main/resources/logback.xml @@ -0,0 +1,139 @@ + + + + + + + + + + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + UTF-8 + + + + + + + + ${logHomeDir}/${log.projectName}/debug/debug.log + + + + + DEBUG + + ACCEPT + + DENY + + + + + + ${logHomeDir}/${log.projectName}/debug/debug.log.%d{yyyy-MM-dd}.%i.log + + 10MB + + ${log.maxHistory:-30} + + + + + + + + + + ${log.pattern} + + UTF-8 + + + + + + + INFO + ACCEPT + DENY + + + ${logHomeDir}/${log.projectName}/info/info.log + + + + ${logHomeDir}/${log.projectName}/info/info.log.%d{yyyy-MM-dd}.%i.log + + 10MB + ${log.maxHistory:-30} + + + + ${log.pattern} + + UTF-8 + + + + + + + + ${logHomeDir}/${log.projectName}/error/error.log + + + ERROR + ACCEPT + DENY + + + + ${logHomeDir}/${log.projectName}/error/error.log.%d{yyyy-MM-dd}.%i.log + + 10MB + ${log.maxHistory:-30} + + + + ${log.pattern} + + UTF-8 + + + + + + + + + + + + + + + + + + + + + + + diff --git a/event_smart/src/test/java/com/njcn/gather/event/EventSmartApplicationTests.java b/event_smart/src/test/java/com/njcn/gather/event/EventSmartApplicationTests.java new file mode 100644 index 00000000..fb9b0aae --- /dev/null +++ b/event_smart/src/test/java/com/njcn/gather/event/EventSmartApplicationTests.java @@ -0,0 +1,13 @@ +package com.njcn.gather.event; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class EventSmartApplicationTests { + + @Test + void contextLoads() { + } + +}