添加暂降智能响应模块

This commit is contained in:
2025-06-19 08:43:26 +08:00
parent 4e0e50a4f8
commit 418e17a427
17 changed files with 890 additions and 58 deletions

View File

@@ -34,7 +34,8 @@ public class PreDetectionController extends BaseController {
private final PreDetectionService preDetectionService; private final PreDetectionService preDetectionService;
/** /**
* 开始检测 * 开始检测通用入口
* @param param 实体参数
*/ */
@PostMapping("/startPreTest") @PostMapping("/startPreTest")
@OperateInfo @OperateInfo

View File

@@ -1134,10 +1134,12 @@ public class SocketDevResponseService {
FormalTestManager.realDataXiList.add(devData); FormalTestManager.realDataXiList.add(devData);
successComm.add(devData.getId()); successComm.add(devData.getId());
//成功收到数据后重置超时统计时间
if (SocketManager.clockMap.containsKey(sourceIssue.getIndex())) { if (SocketManager.clockMap.containsKey(sourceIssue.getIndex())) {
SocketManager.clockMap.put(sourceIssue.getIndex(), 0L); SocketManager.clockMap.put(sourceIssue.getIndex(), 0L);
} }
//当成功的通道数量和检测通道数量相同时进入
if (successComm.size() == FormalTestManager.monitorIdListComm.size()) { if (successComm.size() == FormalTestManager.monitorIdListComm.size()) {
System.out.println(sourceIssue.getType() + splitTag + sourceIssue.getIndex() + "当前测试小项读取数据已经全部结束。。。。。。。。。"); System.out.println(sourceIssue.getType() + splitTag + sourceIssue.getIndex() + "当前测试小项读取数据已经全部结束。。。。。。。。。");
//修改装置为监测中 //修改装置为监测中
@@ -1149,16 +1151,19 @@ public class SocketDevResponseService {
Map<String, Integer> textResult = detectionServiceImpl.processing(FormalTestManager.realDataXiList, param, FormalTestManager.devIdMapComm, sourceIssue, dataRule); Map<String, Integer> textResult = detectionServiceImpl.processing(FormalTestManager.realDataXiList, param, FormalTestManager.devIdMapComm, sourceIssue, dataRule);
System.out.println(textResult); System.out.println(textResult);
//组装实体推送给前台 //对检测结果组装实体推送给前台
assWebJson(param, textResult, socketDataMsg, sourceIssue); assWebJson(param, textResult, socketDataMsg, sourceIssue);
//当小项结束后需要删除集合中的小项 //当小项结束后需要删除集合中的小项
SocketManager.delSource(sourceIssue.getIndex()); SocketManager.delSource(sourceIssue.getIndex());
System.out.println("当前小项结束进行删除============" + sourceIssue.getType() + splitTag + sourceIssue.getIndex()); System.out.println("当前小项结束进行删除============" + sourceIssue.getType() + splitTag + sourceIssue.getIndex());
//小项检测完后小项数减一并更新map
long residueCount = SocketManager.getSourceTarget(sourceIssue.getType()) - 1; long residueCount = SocketManager.getSourceTarget(sourceIssue.getType()) - 1;
SocketManager.addTargetMap(sourceIssue.getType(), residueCount); SocketManager.addTargetMap(sourceIssue.getType(), residueCount);
System.out.println("该大项还有" + residueCount + "个小项没有进行检测!!!!!!!!"); System.out.println("该大项还有" + residueCount + "个小项没有进行检测!!!!!!!!");
//当该大项中小项数量变为0则任务该大项检测结束
if (residueCount == 0) { if (residueCount == 0) {
System.out.println(sourceIssue.getType() + sourceIssue.getIndex() + "当前测试大项已经全部结束》》》》》》》》"); System.out.println(sourceIssue.getType() + sourceIssue.getIndex() + "当前测试大项已经全部结束》》》》》》》》");
//当residueCount为0则认为大项中的小项已经全部跑完开始组装信息推送给前端 //当residueCount为0则认为大项中的小项已经全部跑完开始组装信息推送给前端
@@ -1200,6 +1205,7 @@ public class SocketDevResponseService {
CnSocketUtil.sendToWebSocket(param.getUserPageId(), sourceIssues.getType() + stepBegin, null, new ArrayList<>(), null); CnSocketUtil.sendToWebSocket(param.getUserPageId(), sourceIssues.getType() + stepBegin, null, new ArrayList<>(), null);
} }
//控源下发下一个小项脚本
SocketMsg<String> xuMsg = new SocketMsg<>(); SocketMsg<String> xuMsg = new SocketMsg<>();
xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue()); xuMsg.setOperateCode(SourceOperateCodeEnum.OPER_GATHER.getValue());
xuMsg.setData(JSON.toJSONString(sourceIssues)); xuMsg.setData(JSON.toJSONString(sourceIssues));
@@ -1254,42 +1260,6 @@ public class SocketDevResponseService {
/*
private void processData(List<DevData> FormalTestManager.realDataXiList, List<String> successComm, SourceIssue sourceIssue, PreDetectionParam param, SocketDataMsg socketDataMsg, Object dataRule, String handlerSourceStr) {
if (SocketManager.clockMap.containsKey(sourceIssue.getIndex())) {
SocketManager.clockMap.put(sourceIssue.getIndex(), 0L);
}
if (successComm.size() == FormalTestManager.monitorIdListComm.size()) {
System.out.println(sourceIssue.getType() + stepTag + sourceIssue.getIndex() + "当前测试小项读取数据已经全部结束。。。。。。。。。");
handleSmallItemCompletion(FormalTestManager.realDataXiList, sourceIssue, param, socketDataMsg, dataRule);
if (handlePauseCondition(param, successComm, FormalTestManager.realDataXiList)) {
return;
}
startNextDetection(sourceIssue, param, socketDataMsg, handlerSourceStr);
successComm.clear();
FormalTestManager.realDataXiList.clear();
}
}
private static void handleSmallItemCompletion(List<DevData> FormalTestManager.realDataXiList, SourceIssue sourceIssue, PreDetectionParam param, SocketDataMsg socketDataMsg, Object dataRule) {
adPlanService.updateTestState(param.getPlanId(), param.getDevIds());
baseDataInsertService.insert(FormalTestManager.realDataXiList, sourceIssue, param, SocketManager.valueTypeMap);
Map<String, Integer> textResult = detectionServiceImpl.processing(FormalTestManager.realDataXiList, param, FormalTestManager.devIdMapComm, sourceIssue, dataRule);
assWebJson(param, textResult, socketDataMsg, sourceIssue);
SocketManager.delSource(sourceIssue.getIndex());
System.out.println("当前小项结束进行删除============" + sourceIssue.getType() + stepTag + sourceIssue.getIndex());
long residueCount = SocketManager.getSourceTarget(sourceIssue.getType()) - 1;
SocketManager.addTargetMap(sourceIssue.getType(), residueCount);
System.out.println("该大项还有" + residueCount + "个小项没有进行检测!!!!!!!!");
if (residueCount == 0) {
handleLargeItemCompletion(sourceIssue, param, socketDataMsg);
}
}
*/
/** /**
* 组装实体推送给前端 * 组装实体推送给前端
*/ */
@@ -1312,11 +1282,6 @@ public class SocketDevResponseService {
if (targetTestMap.containsKey(sourceIssue.getType())) { if (targetTestMap.containsKey(sourceIssue.getType())) {
List<DevLineTestResult> devLineTestResultList = targetTestMap.get(sourceIssue.getType()); List<DevLineTestResult> devLineTestResultList = targetTestMap.get(sourceIssue.getType());
// devListRes.forEach(it1 -> {
// devLineTestResultList.stream().filter(it2 -> it2.getDeviceId().equals(it1.getDeviceId())).findFirst().ifPresent(it2 -> {
// setNewChnResult(it2.getChnResult(), it1.getChnResult());
// });
// });
devLineTestResultList.addAll(devListRes); devLineTestResultList.addAll(devListRes);
} else { } else {
targetTestMap.put(sourceIssue.getType(), devListRes); targetTestMap.put(sourceIssue.getType(), devListRes);

View File

@@ -78,8 +78,9 @@ public class PreDetectionServiceImpl implements PreDetectionService {
@Override @Override
public void sourceCommunicationCheck(PreDetectionParam param) { public void sourceCommunicationCheck(PreDetectionParam param) {
// 参数校验目前仅检查IP是否重复 // 参数校验目前仅检查IP是否重复,后续可在里面扩展
checkDevIp(param); checkDevIp(param);
//用于处理异常导致的socket通道未关闭socket交互异常
DetectionCommunicateUtil.checkCommunicateChannel(param); DetectionCommunicateUtil.checkCommunicateChannel(param);
/* /*
先组装源通讯协议 先组装源通讯协议
@@ -141,6 +142,8 @@ public class PreDetectionServiceImpl implements PreDetectionService {
param.setErrorSysId(plan.getErrorSysId()); param.setErrorSysId(plan.getErrorSysId());
param.setCode(String.valueOf(plan.getCode())); param.setCode(String.valueOf(plan.getCode()));
if (ObjectUtil.isNotNull(planSource)) { if (ObjectUtil.isNotNull(planSource)) {
//获取源初始化参数
SourceInitialize sourceParam = pqSourceService.getSourceInitializeParam(planSource.getSourceId()); SourceInitialize sourceParam = pqSourceService.getSourceInitializeParam(planSource.getSourceId());
if (ObjectUtil.isNotNull(sourceParam)) { if (ObjectUtil.isNotNull(sourceParam)) {
//开始组装socket报文请求头 //开始组装socket报文请求头
@@ -150,6 +153,7 @@ public class PreDetectionServiceImpl implements PreDetectionService {
socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue()); socketMsg.setRequestId(SourceOperateCodeEnum.YJC_YTXJY.getValue());
socketMsg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue()); socketMsg.setOperateCode(SourceOperateCodeEnum.INIT_GATHER.getValue());
socketMsg.setData(JSON.toJSONString(sourceParam)); socketMsg.setData(JSON.toJSONString(sourceParam));
//建立与源控程序的socket连接
NettyClient.socketClient(ip, port, param, JSON.toJSONString(socketMsg), new NettySourceClientHandler(param, sourceResponseService)); NettyClient.socketClient(ip, port, param, JSON.toJSONString(socketMsg), new NettySourceClientHandler(param, sourceResponseService));
} else { } else {
throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT); throw new BusinessException(DetectionResponseEnum.SOURCE_INFO_NOT);

View File

@@ -49,10 +49,10 @@ public class NettyClient {
//空闲状态的handler //空闲状态的handler
// 添加LineBasedFrameDecoder来按行分割数据 // 添加LineBasedFrameDecoder来按行分割数据
.addLast(new LineBasedFrameDecoder(10240)) .addLast(new LineBasedFrameDecoder(10240))
// .addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS)) // .addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS))
.addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new HeartbeatHandler(param,source)) .addLast(new HeartbeatHandler(param, source))
.addLast(handler); .addLast(handler);
} else { } else {
ch.pipeline() ch.pipeline()
@@ -61,7 +61,7 @@ public class NettyClient {
.addLast(new LineBasedFrameDecoder(10240)) .addLast(new LineBasedFrameDecoder(10240))
.addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new HeartbeatHandler(param,dev)) .addLast(new HeartbeatHandler(param, dev))
//空闲状态的handler //空闲状态的handler
.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)) .addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
.addLast(handler); .addLast(handler);
@@ -75,20 +75,25 @@ public class NettyClient {
System.out.println("链接服务端失败..."); System.out.println("链接服务端失败...");
} else { } else {
System.out.println("链接服务端成功..."); System.out.println("链接服务端成功...");
System.out.println("客户端向服务端发送消息:"+port+msg); System.out.println("客户端向服务端发送消息:" + port + msg);
channelFuture.channel().writeAndFlush(msg+"\n"); channelFuture.channel().writeAndFlush(msg + "\n");
} }
}); });
NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + "_Dev");
if(ObjectUtil.isNotNull(groupByUserId)){ if (handler instanceof NettySourceClientHandler) {
groupByUserId.shutdownGracefully(); NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + source);
}else{ if(ObjectUtil.isNotNull(groupByUserId)){
if (handler instanceof NettySourceClientHandler) { groupByUserId.shutdownGracefully().sync();
SocketManager.addGroup(param.getUserPageId()+source,group);
}else{
SocketManager.addGroup(param.getUserPageId()+dev,group);
} }
SocketManager.addGroup(param.getUserPageId() + source, group);
} else {
NioEventLoopGroup groupByUserId = SocketManager.getGroupByUserId(param.getUserPageId() + dev);
if(ObjectUtil.isNotNull(groupByUserId)){
groupByUserId.shutdownGracefully().sync();
}
SocketManager.addGroup(param.getUserPageId() + dev, group);
} }
} catch (Exception e) { } catch (Exception e) {
System.out.println("连接socket服务端发送异常............" + e.getMessage()); System.out.println("连接socket服务端发送异常............" + e.getMessage());
group.shutdownGracefully(); group.shutdownGracefully();
@@ -100,7 +105,7 @@ public class NettyClient {
socketDataMsg.setRequestId("connect"); socketDataMsg.setRequestId("connect");
if (handler instanceof NettySourceClientHandler) { if (handler instanceof NettySourceClientHandler) {
socketDataMsg.setOperateCode("Source"); socketDataMsg.setOperateCode("Source");
}else{ } else {
CnSocketUtil.quitSendSource(param); CnSocketUtil.quitSendSource(param);
socketDataMsg.setOperateCode("Dev"); socketDataMsg.setOperateCode("Dev");
} }

33
event_smart/.gitignore vendored Normal file
View File

@@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

101
event_smart/pom.xml Normal file
View File

@@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.njcn.gather</groupId>
<artifactId>CN_Gather</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>event_smart</artifactId>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>njcn-common</artifactId>
<version>0.0.1</version>
</dependency>
<!-- <dependency>
<groupId>com.njcn</groupId>
<artifactId>rocket-mq-springboot-starter</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.12</version>
</dependency>
<!-- 多数据源切换当数据源为oracle时需要使用 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>spingboot2.3.12</artifactId>
<version>2.3.12</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>mybatis-plus</artifactId>
<version>0.0.1</version>
</dependency>
</dependencies>
<build>
<finalName>event_smart</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@@ -0,0 +1,15 @@
package com.njcn.gather.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@Slf4j
@SpringBootApplication
public class EventSmartApplication {
public static void main(String[] args) {
SpringApplication.run(EventSmartApplication.class, args);
}
}

View File

@@ -0,0 +1,9 @@
package com.njcn.gather.event.mapper;
import com.github.jeffreyning.mybatisplus.base.MppBaseMapper;
import com.njcn.gather.event.pojo.po.PqsEventdetailPO;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface PqsEventdetailMapper extends MppBaseMapper<PqsEventdetailPO> {
}

View File

@@ -0,0 +1,10 @@
package com.njcn.gather.event.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.gather.event.pojo.po.RmpEventDetailPO;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface RmpEventDetailMapper extends BaseMapper<RmpEventDetailPO> {
}

View File

@@ -0,0 +1,44 @@
//package com.njcn.gather.event.mq.consumer;
//
//import com.njcn.message.messagedto.MessageDataDTO;
//import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
//import org.apache.rocketmq.spring.core.RocketMQListener;
//import org.springframework.stereotype.Component;
//
///**
// * @Author: cdf
// * @CreateTime: 2025-06-04
// * @Description: 暂降消费者
// */
//@Component
//@RocketMQMessageListener(
// topic = "CC",
// consumerGroup = "CC",
// selectorExpression = "Test_Tag||Test_Keys",
// consumeThreadNumber = 10,
// enableMsgTrace = true
//)
//@Slf4j
//public class EventConsumer extends EnhanceConsumerMessageHandler<MessageDataDTO> implements RocketMQListener<String> {
// @Override
// protected void handleMessage(MessageDataDTO message) throws Exception {
//
// }
//
// @Override
// protected boolean isRetry() {
// return false;
// }
//
// @Override
// protected boolean throwException() {
// return false;
// }
//
// @Override
// public void onMessage(String message) {
//
// }
//}

View File

@@ -0,0 +1,94 @@
package com.njcn.gather.event.pojo.po;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.github.jeffreyning.mybatisplus.anno.MppMultiId;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2022/12/28 13:46【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
@TableName(value = "PQS_EVENTDETAIL")
public class PqsEventdetailPO {
@MppMultiId(value = "EVENTDETAIL_INDEX")
private String eventdetailIndex;
@TableField(value = "LINEID")
private BigDecimal lineid;
@TableField(value = "TIMEID")
private Date timeid;
@TableField(value = "MS")
private BigDecimal ms;
@TableField(value = "DESCRIBE")
private String describe;
@TableField(value = "WAVETYPE")
private Integer wavetype;
@TableField(value = "PERSISTTIME")
private Double persisttime;
@TableField(value = "EVENTVALUE")
private Double eventvalue;
@TableField(value = "EVENTREASON")
private String eventreason;
@TableField(value = "EVENTTYPE")
private String eventtype;
@TableField(value = "EVENTASS_INDEX")
private String eventassIndex;
@TableField(value = "DQTIME")
private Double dqtime;
@TableField(value = "DEALTIME")
private Date dealtime;
@TableField(value = "DEALFLAG")
private Integer dealflag;
@TableField(value = "NUM")
private BigDecimal num;
@TableField(value = "FILEFLAG")
private Integer fileflag;
@TableField(value = "FIRSTTIME")
private Date firsttime;
@TableField(value = "FIRSTTYPE")
private String firsttype;
@TableField(value = "FIRSTMS")
private BigDecimal firstms;
@TableField(value = "WAVENAME")
private String wavename;
@TableField(value = "ENERGY")
private Double energy;
@TableField(value = "SEVERITY")
private Double severity;
/**
* 暂降源与监测位置关系Upper:上游Lower :下游Unknown :未知;为空则是未计算
*/
@TableField(value = "SAGSOURCE")
private String sagsource;
}

View File

@@ -0,0 +1,126 @@
package com.njcn.gather.event.pojo.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 暂降明细实体类
*
* @author yzh
* @since 2022-10-12 18:34:55
*/
@Data
@TableName("r_mp_event_detail")
@ApiModel(value="RmpEventDetail对象")
public class RmpEventDetailPO implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "暂时事件ID")
@TableId(value = "event_id", type = IdType.ASSIGN_ID)
private String eventId;
@ApiModelProperty(value = "监测点ID")
private String measurementPointId;
@ApiModelProperty(value = "监测点ID(复制)")
@TableField("measurement_point_id")
private String lineId;
@ApiModelProperty(value = "统计类型")
private String eventType;
@ApiModelProperty(value = "暂降原因Event_Reason")
@TableField("advance_reason")
private String advanceReason;
@ApiModelProperty(value = "暂降类型Event_Type")
@TableField("advance_type")
private String advanceType;
@ApiModelProperty(value = "事件关联分析表Guid")
private String eventassIndex;
@ApiModelProperty(value = "dq计算持续时间 ")
private Double dqTime;
@ApiModelProperty(value = "特征值计算更新时间外键PQS_Relevance的Time字段")
private LocalDateTime dealTime;
@ApiModelProperty(value = "默认事件个数为0")
private Integer num;
@ApiModelProperty(value = "波形文件是否从装置招到本地(0未招1已招)默认值为0")
private Integer fileFlag;
@ApiModelProperty(value = "特征值计算标志0未处理1已处理; 2已处理无结果;3计算失败默认值为0")
private Integer dealFlag;
@ApiModelProperty(value = "处理结果第一条事件发生时间(读comtra文件获取)")
private LocalDateTime firstTime;
@ApiModelProperty(value = "处理结果第一条事件暂降类型字典表PQS_Dicdata")
private String firstType;
@ApiModelProperty(value = "处理结果第一条事件发生时间毫秒(读comtra文件获取)")
private Double firstMs;
@ApiModelProperty(value = "暂降能量")
private Double energy;
@ApiModelProperty(value = "暂降严重度")
private Double severity;
@ApiModelProperty(value = "暂降源与监测位置关系 Upper:上游Lower :下游Unknown :未知;为空则是未计算")
private String sagsource;
@ApiModelProperty(value = "开始时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
private LocalDateTime startTime;
@ApiModelProperty(value = "格式化开始时间")
@TableField(exist = false)
private String formatTime;
@ApiModelProperty(value = "持续时间,单位秒")
private Double duration;
@ApiModelProperty(value = "特征幅值")
private Double featureAmplitude;
@ApiModelProperty(value = "相别")
private String phase;
@ApiModelProperty(value = "事件描述")
private String eventDescribe;
@ApiModelProperty(value = "波形路径")
private String wavePath;
@ApiModelProperty(value = "暂降核实原因")
@TableField("verify_reason")
private String verifyReason;
@ApiModelProperty(value = "暂降核实原因详情")
@TableField("verify_reason_detail")
private String verifyReasonDetail;
private Double transientValue;
private LocalDateTime createTime;
@ApiModelProperty(value = "用于计算数量")
@TableField(exist = false)
private Integer count;
}

View File

@@ -0,0 +1,41 @@
package com.njcn.gather.event.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
/**
* Description:
* Date: 2024/12/13 15:09【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* 通信文本消息和二进制缓存区大小
* 避免对接 第三方 报文过大时Websocket 1009 错误
*
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 在此处设置bufferSize
container.setMaxTextMessageBufferSize(10240000);
container.setMaxBinaryMessageBufferSize(10240000);
container.setMaxSessionIdleTimeout(15 * 60000L);
return container;
}
}

View File

@@ -0,0 +1,177 @@
package com.njcn.gather.event.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Description:
* Date: 2024/12/13 15:11【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@Component
@ServerEndpoint(value ="/api/pushMessage/{userIdAndLineIdAndDevId}")
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set用来存放每个客户端对应的WebSocket对象。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userIdAndLineIdAndDevId = "";
/**
* 连接建立成
* 功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userIdAndLineIdAndDevId") String userIdAndLineIdAndDevId) {
this.session = session;
this.userIdAndLineIdAndDevId = userIdAndLineIdAndDevId;
if (webSocketMap.containsKey(userIdAndLineIdAndDevId)) {
webSocketMap.remove(userIdAndLineIdAndDevId);
//加入set中
webSocketMap.put(userIdAndLineIdAndDevId, this);
} else {
//加入set中
webSocketMap.put(userIdAndLineIdAndDevId, this);
//在线数加1
addOnlineCount();
}
sendMessage("连接成功");
}
/**
* 连接关闭
* 调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userIdAndLineIdAndDevId)) {
webSocketMap.remove(userIdAndLineIdAndDevId);
//从set中删除
subOnlineCount();
}
log.info("监测点退出:" + userIdAndLineIdAndDevId + ",当前在线监测点数为:" + getOnlineCount());
}
/**
* 收到客户端消
* 息后调用的方法
*
* @param message 客户端发送过来的消息
**/
@OnMessage
public void onMessage(String message, Session session) {
//会每30s发送请求1次
log.info("监测点消息:" + userIdAndLineIdAndDevId + ",报文:" + message);
log.info("监测点连接:" + userIdAndLineIdAndDevId + ",当前在线监测点数为:" + getOnlineCount());
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("监测点错误:" + this.userIdAndLineIdAndDevId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务
* 器主动推送
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 发送自定
* 义消息
**/
public static void sendInfo(String message, String lineId) {
log.info("发送消息到:" + lineId + ",报文:" + message);
if (StringUtils.isNotBlank(lineId)) {
Map<String, String> stringStringMap = WebSocketServer.filterMapByKey(webSocketMap, lineId);
stringStringMap.forEach((k,v)->{
webSocketMap.get(k).sendMessage(message);
});
} else {
log.error("监测点" + lineId + ",不在线!");
}
}
/**
* 获得此时的
* 在线监测点
*
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 在线监测点
* 数加1
*/
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
/**
* 在线监测点
* 数减1
*/
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
/**
* 过滤所有键包含指定字符串的条目
* @param map 原始的Map
* @param substring 要检查的子字符串
* @return 过滤的Map
*/
public static Map<String, String> filterMapByKey(ConcurrentHashMap<String, WebSocketServer> map, String substring) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, WebSocketServer> entry : map.entrySet()) {
if (entry.getKey().contains(substring)) {
result.put(entry.getKey(), entry.getValue().toString());
}
}
return result;
}
}

View File

@@ -0,0 +1,55 @@
server:
port: 18093
spring:
application:
name: event_smart
datasource:
dynamic:
primary: master
strict: false # 是否严格匹配数据源默认false
druid: # 如果使用Druid连接池
validation-query: SELECT 1 FROM DUAL # 达梦专用校验SQL
initial-size: 10
# 初始化大小,最小,最大
min-idle: 20
maxActive: 500
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
testWhileIdle: true
testOnBorrow: true
testOnReturn: false
# 打开PSCache并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
datasource:
master:
driver-class-name: dm.jdbc.driver.DmDriver
url: jdbc:dm://192.168.1.21:5236/PQSADMIN?useUnicode=true&characterEncoding=utf-8
username: PQSADMINLN
password: Pqsadmin123
#mybatis配置信息
mybatis-plus:
mapper-locations: classpath*:com/njcn/**/mapping/*.xml
#别名扫描
type-aliases-package: com.njcn.gather.event.pojo
configuration:
#驼峰命名
map-underscore-to-camel-case: true
#配置sql日志输出
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#关闭日志输出
# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
global-config:
db-config:
#指定主键生成策略
id-type: assign_uuid

View File

@@ -0,0 +1,139 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="20 seconds" debug="false">
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<springProperty scope="context" name="log.projectName" source="spring.application.name" defaultValue="event_msg"/>
<springProperty scope="context" name="logCommonLevel" source="log.commonLevel" defaultValue="info"/>
<springProperty scope="context" name="logHomeDir" source="log.homeDir" defaultValue="D:\logs"/>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="ec"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!--日志输出格式-->
<property name="log.pattern"
value="|-%d{yyyy-MM-dd HH:mm:ss.SSS} ${LOG_LEVEL_PATTERN:-%level} ${log.projectName} -- %t %logger{100}.%M ==> %m%n${Log_EXCEPTION_CONVERSION_WORD:-%ec}}}"/>
<property name="log.maxHistory" value="30"/>
<!--客户端输出日志-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--系统中常规的debug日志-->
<!-- 滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 RollingFileAppender -->
<appender name="DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>
${logHomeDir}/${log.projectName}/debug/debug.log
</file>
<!-- 如果日志级别等于配置级别过滤器会根据onMath 和 onMismatch接收或拒绝日志。 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 设置过滤级别 -->
<level>DEBUG</level>
<!-- 用于配置符合过滤条件的操作 -->
<onMatch>ACCEPT</onMatch>
<!-- 用于配置不符合过滤条件的操作 -->
<onMismatch>DENY</onMismatch>
</filter>
<!-- 最常用的滚动策略,它根据时间来制定滚动策略.既负责滚动也负责触发滚动 SizeAndTimeBasedRollingPolicy-->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!--日志输出位置 可相对、和绝对路径 -->
<fileNamePattern>
${logHomeDir}/${log.projectName}/debug/debug.log.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<!-- 可选节点,控制保留的归档文件的最大数量,超出数量就删除旧文件,假设设置每个月滚动,且<maxHistory>是6
则只保存最近6个月的文件删除之前的旧文件。注意删除旧文件是那些为了归档而创建的目录也会被删除 -->
<maxHistory>${log.maxHistory:-30}</maxHistory>
<!--重启清理日志文件-->
<!-- <cleanHistoryOnStart>true</cleanHistoryOnStart>-->
<!--每个文件最多100MB保留N天的历史记录但最多20GB-->
<!--<totalSizeCap>20GB</totalSizeCap>-->
<!--日志文件最大的大小-->
<!--<MaxFileSize>${log.maxSize}</MaxFileSize>-->
</rollingPolicy>
<encoder>
<pattern>
${log.pattern}
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--系统中常规的info日志-->
<appender name="INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<file>
${logHomeDir}/${log.projectName}/info/info.log
</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>
${logHomeDir}/${log.projectName}/info/info.log.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>${log.maxHistory:-30}</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
${log.pattern}
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--系统中常规的error日志-->
<appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>
${logHomeDir}/${log.projectName}/error/error.log
</file>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>
${logHomeDir}/${log.projectName}/error/error.log.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
<maxHistory>${log.maxHistory:-30}</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
${log.pattern}
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR"/>
<logger name="org.apache.catalina.util.LifecycleBase" level="ERROR"/>
<logger name="org.apache.coyote.http11.Http11NioProtocol" level="WARN"/>
<logger name="com.njcn" level="INFO" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEBUG"/>
<appender-ref ref="INFO"/>
<appender-ref ref="ERROR"/>
</logger>
<root level="${logCommonLevel}">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEBUG"/>
<appender-ref ref="INFO"/>
<appender-ref ref="ERROR"/>
</root>
</configuration>

View File

@@ -0,0 +1,13 @@
package com.njcn.gather.event;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class EventSmartApplicationTests {
@Test
void contextLoads() {
}
}