5 Commits

Author SHA1 Message Date
xy
cd8cf60683 云前置改造 2025-11-14 10:02:52 +08:00
xy
528f376f6d 云前置改造微调 2025-11-04 14:09:51 +08:00
xy
1d29a03a3c 云前置改造-补召功能调整 2025-10-23 09:41:49 +08:00
xy
9ea6a00cb5 无线接入bug调整 2025-10-16 16:35:52 +08:00
xy
c33490c4fc 云前置改造-暂态数据补召功能 2025-10-15 20:51:16 +08:00
10 changed files with 241 additions and 52 deletions

View File

@@ -380,32 +380,27 @@ public class MqttMessageHandler {
if (Objects.equals(res.getDid(),1)){
log.info("{},设备数据应答--->更新治理监测点信息和设备容量", nDid);
List<CsDevCapacityPO> list3 = new ArrayList<>();
devInfo.forEach(item->{
//1.更新治理监测点信息
CsLineParam csLineParam = new CsLineParam();
if (Objects.equals(item.getClDid(),0)){
csLineParam.setLineId(nDid.concat("0"));
boolean hasZeroClDid = devInfo.stream().anyMatch(item -> item.getClDid() == 0);
//治理设备
if (hasZeroClDid) {
devInfo.forEach(item->{
if (Objects.equals(item.getClDid(),0)){
updateLineInfo(nDid,item);
}
//2.录入各个模块设备容量
CsDevCapacityPO csDevCapacity = new CsDevCapacityPO();
csDevCapacity.setLineId(nDid.concat("0"));
csDevCapacity.setCldid(item.getClDid());
csDevCapacity.setCapacity(Objects.isNull(item.getCapacityA())?0.0:item.getCapacityA());
list3.add(csDevCapacity);
} else {
csLineParam.setLineId(nDid.concat(item.getClDid().toString()));
}
csLineParam.setVolGrade(item.getVolGrade());
csLineParam.setPtRatio(item.getPtRatio());
csLineParam.setCtRatio(item.getCtRatio());
csLineParam.setConType(item.getConType());
csLineParam.setLineInterval(item.getStatCycle());
csLineFeignClient.updateLine(csLineParam);
//生成监测点限值
Overlimit overlimit = COverlimitUtil.globalAssemble(item.getVolGrade().floatValue(),10f,10f,10f,0,0);
overlimit.setId(nDid.concat(item.getClDid().toString()));
overlimitMapper.deleteById(nDid.concat(item.getClDid().toString()));
overlimitMapper.insert(overlimit);
});
});
}
//其余设备
else {
devInfo.forEach(item->{
updateLineInfo(nDid,item);
});
}
if (CollectionUtil.isNotEmpty(list3)) {
devCapacityFeignClient.addList(list3);
//3.更新设备模块个数
@@ -416,14 +411,7 @@ public class MqttMessageHandler {
logDto.setOperate(nDid + "更新电网侧、负载侧监测点信息");
//1.更新电网侧、负载侧监测点相关信息
devInfo.forEach(item->{
CsLineParam csLineParam = new CsLineParam();
csLineParam.setLineId(nDid.concat(item.getClDid().toString()));
csLineParam.setVolGrade(item.getVolGrade());
csLineParam.setPtRatio(item.getPtRatio());
csLineParam.setCtRatio(item.getCtRatio());
csLineParam.setConType(item.getConType());
csLineParam.setLineInterval(item.getStatCycle());
csLineFeignClient.updateLine(csLineParam);
updateLineInfo(nDid,item);
});
}
}
@@ -468,6 +456,22 @@ public class MqttMessageHandler {
}
}
public void updateLineInfo(String nDid,RspDataDto.LdevInfo item) {
CsLineParam csLineParam = new CsLineParam();
csLineParam.setLineId(nDid.concat(item.getClDid().toString()));
csLineParam.setVolGrade(item.getVolGrade());
csLineParam.setPtRatio(item.getPtRatio());
csLineParam.setCtRatio(item.getCtRatio());
csLineParam.setConType(item.getConType());
csLineParam.setLineInterval(item.getStatCycle());
csLineFeignClient.updateLine(csLineParam);
//生成监测点限值
Overlimit overlimit = COverlimitUtil.globalAssemble(item.getVolGrade().floatValue(),10f,10f,10f,0,0);
overlimit.setId(nDid.concat(item.getClDid().toString()));
overlimitMapper.deleteById(nDid.concat(item.getClDid().toString()));
overlimitMapper.insert(overlimit);
}
/**
* 装置心跳 && 主动数据上送
* fixme 这边由于接收文件数据时间跨度会很长,途中有其他请求进来会中断之前的程序,目前是记录中断的位置,等处理完成再继续请求接收文件

View File

@@ -38,7 +38,6 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -328,7 +327,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
param.setNDid(nDid);
param.setStatus(1);
param.setRunStatus(1);
param.setProcess(2);
boolean isConnectDev = DicDataEnum.CONNECT_DEV.getCode().equals(dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData().getCode());
if (isConnectDev) {
param.setProcess(2);
} else {
param.setProcess(4);
}
csEquipmentDeliveryService.devResetFactory(param);
//清除关系表
QueryWrapper<CsLedger> csLedgerQueryWrapper = new QueryWrapper<>();
@@ -374,12 +378,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
@Transactional(rollbackFor = Exception.class)
public String wlDevRegister(String nDid) {
String result = "fail";
// 设备状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
// MQTT询问装置用的模板并判断库中是否存在模板
checkDeviceModel(nDid);
// 根据模板接入设备
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
@@ -387,9 +385,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
logDto.setOperate("便携式设备"+nDid+"注册、接入");
logDto.setResult(1);
try {
// 设备状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
Thread.sleep(2000);
// MQTT询问装置用的模板并判断库中是否存在模板
checkDeviceModel(nDid);
Thread.sleep(2000);
//获取版本
String version = csTopicService.getVersion(nDid);
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
List<CsLinePO> csLinePoList = new ArrayList<>();
//1.录入装置台账信息
@@ -441,6 +444,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
//5.发起自动接入请求
Thread.sleep(2000);
//先获取版本
// String version = csTopicService.getVersion(nDid);
String version = "V1";
devAccessAskTemplate(nDid,version,1);
//6.修改流程,便携式设备接入成功即为实际环境
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);

View File

@@ -1,7 +1,6 @@
package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -18,7 +17,10 @@ import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -151,7 +153,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
QueryWrapper<CsEquipmentDeliveryPO> wrapper = new QueryWrapper<>();
wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated");
wrapper.eq("usage_status", 1);
wrapper.eq("run_status", 2);
wrapper.isNotNull("node_id");
return baseMapper.selectObjs(wrapper)
.stream()

View File

@@ -57,6 +57,11 @@ public class StatServiceImpl implements IStatService {
private final RedisUtil redisUtil;
private final ChannelObjectUtil channelObjectUtil;
private final CsLineLatestDataFeignClient csLineLatestDataFeignClient;
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
put("AB", "A");
put("BC", "B");
put("CA", "C");
}};
@Override
@Transactional(rollbackFor = Exception.class)
@@ -215,10 +220,11 @@ public class StatServiceImpl implements IStatService {
String tableName = map.get(dataArrayList.get(i).getName());
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.LINE_ID,lineId);
tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase());
tags.put(InfluxDBTableConstant.PHASIC_TYPE,Objects.isNull(PHASE_MAPPING.get(dataArrayList.get(i).getPhase()))?dataArrayList.get(i).getPhase():PHASE_MAPPING.get(dataArrayList.get(i).getPhase()));
tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod);
tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
tags.put(InfluxDBTableConstant.PROCESS,process.toString());
tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0");
Map<String,Object> fields = new HashMap<>();
//这边特殊处理如果数据为3.14159则将数据置为null
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
@@ -244,4 +250,7 @@ public class StatServiceImpl implements IStatService {
return urlList;
}
//相别处理
}

View File

@@ -11,10 +11,10 @@ import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -33,12 +33,19 @@ public class CsEventServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
@Transactional(rollbackFor = Exception.class)
public List<String> updateCsEvent(CsEventParam csEventParam) {
List<String> eventList = new ArrayList<>();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
LocalDateTime dateTime = LocalDateTime.parse(csEventParam.getStartTime(), formatter);
// 减去1毫秒
LocalDateTime newDateTime = dateTime.minusNanos(1000000);
String startTime = newDateTime.format(formatter);
//1.将波形文件关联事件
LambdaUpdateWrapper<CsEventPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId())
.eq(CsEventPO::getDeviceId,csEventParam.getDeviceId())
.in(CsEventPO::getType, Arrays.asList(0,1))
.between(CsEventPO::getStartTime,csEventParam.getStartTime(),csEventParam.getEndTime());
.between(CsEventPO::getStartTime,startTime,csEventParam.getEndTime());
if (Objects.nonNull(csEventParam.getLocation())) {
lambdaUpdateWrapper.eq(CsEventPO::getLocation, csEventParam.getLocation());
}

View File

@@ -0,0 +1,148 @@
package com.njcn.message.consumer;
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
import com.njcn.csdevice.param.IcdBzReplyParam;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.BzMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 类的介绍:接收前置响应台账更新相关信息
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:32
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.REPLY_TOPIC,
consumerGroup = "RECALL",
selectorExpression = "RECALL",
consumeMode = ConsumeMode.ORDERLY,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class BzConsumer extends EnhanceConsumerMessageHandler<BzMessage> implements RocketMQListener<BzMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Resource
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
@Override
public void handleMessage(BzMessage message) {
log.info("分发至补召响应处理程序");
//收到消息修改(cs_terminal_reply)
IcdBzReplyParam param = new IcdBzReplyParam();
param.setId(message.getGuid());
param.setDeviceId(message.getTerminalId());
param.setLineId(message.getMonitorId());
param.setCode(message.getCode());
param.setMsg(message.getResult());
if (param.getCode() == 200) {
param.setState(1);
} else {
param.setState(2);
}
csTerminalReplyFeignClient.updateBzData(param);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(BzMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(BzMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(BzMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(BzMessage message) {
super.dispatchMessage(message);
}
}

View File

@@ -12,6 +12,7 @@ import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@@ -31,6 +32,7 @@ import java.util.Objects;
topic = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
consumerGroup = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
consumeThreadNumber = 10,
consumeMode = ConsumeMode.ORDERLY,
enableMsgTrace = true
)
@Slf4j

View File

@@ -12,6 +12,7 @@ import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import com.njcn.zlevent.api.EventFeignClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@@ -31,6 +32,7 @@ import java.util.Objects;
topic = BusinessTopic.LOG_TOPIC,
consumerGroup = BusinessTopic.LOG_TOPIC,
consumeThreadNumber = 10,
consumeMode = ConsumeMode.ORDERLY,
enableMsgTrace = true
)
@Slf4j

View File

@@ -11,6 +11,7 @@ import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@@ -30,6 +31,7 @@ import java.util.Objects;
topic = BusinessTopic.HEART_BEAT_TOPIC,
consumerGroup = BusinessTopic.HEART_BEAT_TOPIC,
consumeThreadNumber = 10,
consumeMode = ConsumeMode.ORDERLY,
enableMsgTrace = true
)
@Slf4j

View File

@@ -2,6 +2,7 @@ package com.njcn.message.consumer;
import cn.hutool.core.util.ObjectUtil;
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
import com.njcn.csdevice.param.IcdBzReplyParam;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
@@ -13,10 +14,10 @@ import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@@ -32,7 +33,9 @@ import java.util.Objects;
@Service
@RocketMQMessageListener(
topic = BusinessTopic.REPLY_TOPIC,
consumerGroup = BusinessTopic.REPLY_TOPIC,
consumerGroup = "LEDGER",
selectorExpression = "LEDGER",
consumeMode = ConsumeMode.ORDERLY,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@@ -47,18 +50,23 @@ public class UpdateLedgerConsumer extends EnhanceConsumerMessageHandler<UpdateLe
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void handleMessage(UpdateLedgerMessage updateLedgerMessage) {
log.info("分发至更新台账响应处理程序");
//收到消息修改(cs_terminal_reply)
List<UpdateLedgerMessage.HandleData> data = updateLedgerMessage.getData();
if (ObjectUtil.isNotEmpty(data)) {
data.forEach(item->{
IcdBzReplyParam param = new IcdBzReplyParam();
param.setId(updateLedgerMessage.getGuid());
param.setDeviceId(item.getDeviceId());
param.setCode(item.getCode());
param.setMsg(item.getResult());
if (item.getCode() == 200) {
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),1,item.getDeviceId());
param.setState(1);
} else {
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),2,item.getDeviceId());
param.setState(2);
}
csTerminalReplyFeignClient.updateData(param);
});
}
}