5 Commits

Author SHA1 Message Date
xy
528f376f6d 云前置改造微调 2025-11-04 14:09:51 +08:00
xy
1d29a03a3c 云前置改造-补召功能调整 2025-10-23 09:41:49 +08:00
xy
9ea6a00cb5 无线接入bug调整 2025-10-16 16:35:52 +08:00
xy
c33490c4fc 云前置改造-暂态数据补召功能 2025-10-15 20:51:16 +08:00
xy
0d4db672e1 新增云协议相关内容 2025-10-11 09:20:41 +08:00
19 changed files with 431 additions and 47 deletions

View File

@@ -35,6 +35,6 @@ public interface AskDeviceDataFeignClient {
HttpResult<String> askRealData(@RequestParam("nDid") String nDid, @RequestParam("idx") Integer idx, @RequestParam("clDId") Integer clDId);
@PostMapping("/askCldRealData")
HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId);
HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId, @RequestParam("idx") Integer idx);
}

View File

@@ -75,7 +75,7 @@ public class AskDeviceDataClientFallbackFactory implements FallbackFactory<AskDe
}
@Override
public HttpResult<String> askCldRealData(String devId, String lineId, String nodeId) {
public HttpResult<String> askCldRealData(String devId, String lineId, String nodeId, Integer idx) {
log.error("{}异常,降级处理,异常为:{}","询问云前置实时数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}

View File

@@ -56,7 +56,7 @@ public enum AccessResponseEnum {
CTRL_DICT_MISSING("A0307","Ctrl字典数据缺失!"),
WAVE_INFO_MISSING("A0307","波形参数缺失!"),
MODEL_MISS("A0308","模板信息缺失!"),
MODEL_MISS("A0308","询问模板信息超时,设备未响应!"),
MODEL_VERSION_ERROR("A0308","询问装置模板信息错误"),
UPLOAD_ERROR("A0308","平台上送文件异常"),
RELOAD_UPLOAD_ERROR("A0308","平台重新上送文件异常"),

View File

@@ -129,11 +129,12 @@ public class AskDeviceDataController extends BaseController {
@ApiImplicitParams({
@ApiImplicitParam(name = "devId", value = "装置id"),
@ApiImplicitParam(name = "lineId", value = "监测点id"),
@ApiImplicitParam(name = "nodeId", value = "前置id")
@ApiImplicitParam(name = "nodeId", value = "前置id"),
@ApiImplicitParam(name = "idx", value = "数据集编号")
})
public HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId){
public HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId, @RequestParam("idx") Integer idx){
String methodDescribe = getMethodDescribe("askCldRealData");
askDeviceDataService.askCldRealData(devId,lineId,nodeId);
askDeviceDataService.askCldRealData(devId,lineId,nodeId,idx);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}

View File

@@ -22,5 +22,5 @@ public interface AskDeviceDataService {
*/
void askRealData(String nDid, Integer idx, Integer size);
void askCldRealData(String devId, String lineId, String nodeId);
void askCldRealData(String devId, String lineId, String nodeId, Integer idx);
}

View File

@@ -213,7 +213,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
}
@Override
public void askCldRealData(String devId, String lineId, String nodeId) {
public void askCldRealData(String devId, String lineId, String nodeId, Integer idx) {
RealDataMessage realDataMessage = new RealDataMessage();
realDataMessage.setDevSeries(devId);
int lastDigit = Character.getNumericValue(lineId.charAt(lineId.length() - 1));
@@ -221,6 +221,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
realDataMessage.setRealData(true);
realDataMessage.setSoeData(true);
realDataMessage.setLimit(20);
realDataMessage.setIdx(idx);
realDataMessageTemplate.sendMember(realDataMessage,nodeId);
}

View File

@@ -2,6 +2,7 @@ package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
@@ -21,11 +22,13 @@ import com.njcn.access.utils.CRC32Utils;
import com.njcn.access.utils.JsonUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLogsFeignClient;
import com.njcn.csdevice.api.DevModelFeignClient;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csdevice.pojo.param.CsDevModelAddParm;
import com.njcn.csdevice.pojo.po.*;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDataSet;
import com.njcn.csdevice.pojo.po.CsDevModelPO;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.oss.constant.OssPath;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.redis.pojo.enums.AppRedisKey;
@@ -46,7 +49,6 @@ import org.springframework.web.multipart.MultipartFile;
import java.sql.Date;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@@ -78,6 +80,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
private final MqttPublisher publisher;
private final ICsTopicService csTopicService;
private final RedisUtil redisUtil;
private final EquipmentFeignClient eequipmentFeignClient;
private final DevModelRelationFeignClient devModelRelationFeignClient;
private final CsLineFeignClient csLineFeignClient;
@Override
@Transactional(rollbackFor = {Exception.class})
@@ -115,6 +120,21 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
analysisDataSet(templateDto,csDevModelPo.getId());
//3.录入监测点模板表(记录当前模板有几个监测点治理类型的模板目前规定1个监测点电能质量模板根据逻辑子设备来)
addCsLineModel(templateDto,csDevModelPo.getId());
//4.如果是云前置的模板录入,重新录入完需要将模板关系信息重新录入
if (Objects.equals(devType,DicDataEnum.DEV_CLD.getCode())) {
List<CsEquipmentDeliveryPO> list = eequipmentFeignClient.getAll().getData();
if (CollectionUtil.isNotEmpty(list)) {
List<String> devList = list.stream()
.filter(item -> Objects.equals(item.getDevAccessMethod(),"CLD"))
.map(CsEquipmentDeliveryPO::getId)
.collect(Collectors.toList());
devModelRelationFeignClient.updateDataByList(devList,csDevModelPo.getId());
Object object = redisUtil.getObjectByKey("setId:" + csDevModelPo.getId());
if (ObjectUtil.isNotNull(object)) {
csLineFeignClient.updateDataByList(devList,csDevModelPo.getId(),object.toString());
}
}
}
csLogsFeignClient.addUserLog(logDto);
} catch (Exception e) {
logDto.setResult(0);
@@ -943,6 +963,11 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
}
if(CollectionUtil.isNotEmpty(setList)) {
csDataSetService.addList(setList);
setList.forEach(item->{
if (Objects.equals(item.getName(),"统计数据")) {
redisUtil.saveByKeyWithExpire("setId:" + pId,item.getId(),30L);
}
});
}
if(CollectionUtil.isNotEmpty(arrayList)) {
csDataArrayService.addList(arrayList);

View File

@@ -38,7 +38,6 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -328,7 +327,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
param.setNDid(nDid);
param.setStatus(1);
param.setRunStatus(1);
param.setProcess(2);
boolean isConnectDev = DicDataEnum.CONNECT_DEV.getCode().equals(dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData().getCode());
if (isConnectDev) {
param.setProcess(2);
} else {
param.setProcess(4);
}
csEquipmentDeliveryService.devResetFactory(param);
//清除关系表
QueryWrapper<CsLedger> csLedgerQueryWrapper = new QueryWrapper<>();
@@ -374,12 +378,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
@Transactional(rollbackFor = Exception.class)
public String wlDevRegister(String nDid) {
String result = "fail";
// 设备状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
// MQTT询问装置用的模板并判断库中是否存在模板
checkDeviceModel(nDid);
// 根据模板接入设备
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
@@ -387,9 +385,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
logDto.setOperate("便携式设备"+nDid+"注册、接入");
logDto.setResult(1);
try {
// 设备状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
Thread.sleep(2000);
// MQTT询问装置用的模板并判断库中是否存在模板
checkDeviceModel(nDid);
Thread.sleep(2000);
//获取版本
String version = csTopicService.getVersion(nDid);
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
List<CsLinePO> csLinePoList = new ArrayList<>();
//1.录入装置台账信息
@@ -441,6 +444,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
//5.发起自动接入请求
Thread.sleep(2000);
//先获取版本
// String version = csTopicService.getVersion(nDid);
String version = "V1";
devAccessAskTemplate(nDid,version,1);
//6.修改流程,便携式设备接入成功即为实际环境
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);

View File

@@ -1,7 +1,6 @@
package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -18,7 +17,10 @@ import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -151,7 +153,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
QueryWrapper<CsEquipmentDeliveryPO> wrapper = new QueryWrapper<>();
wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated");
wrapper.eq("usage_status", 1);
wrapper.eq("run_status", 2);
wrapper.isNotNull("node_id");
return baseMapper.selectObjs(wrapper)
.stream()

View File

@@ -102,12 +102,13 @@ public class RtServiceImpl implements IRtService {
baseRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec();
baseRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false);
publisher.send("/Web/RealData/" + userId, new Gson().toJson(baseRealDataSet), 1, false);
});
}
}
//fixme 目前实时数据只有基础数据和谐波数据,后期拓展,这边需要再判断
else {
long timestamp;
//用户Id
String userId = redisUtil.getObjectByKey("rtDataUserId:"+lineId).toString();
HarmRealDataSet harmRealDataSet = harmData(dataArrayList,item,dataSet.getDataLevel(),po.getCtRatio());
@@ -116,7 +117,11 @@ public class RtServiceImpl implements IRtService {
harmRealDataSet.setPt(po.getPtRatio().floatValue());
harmRealDataSet.setCt(po.getCtRatio().floatValue());
harmRealDataSet.setDataLevel(dataSet.getDataLevel());
long timestamp = item.getDataTimeSec() - 8*3600;
if (ObjectUtil.isNotNull(po.getLineNo())) {
timestamp = item.getDataTimeSec();
} else {
timestamp = item.getDataTimeSec() - 8*3600;
}
harmRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(harmRealDataSet), 1, false);
}
@@ -323,6 +328,8 @@ public class RtServiceImpl implements IRtService {
}
} else if (Objects.equals(item.getHarmName(),"Pq_RmsFundU")) {
harmRealDataSet.setData1(FloatUtils.get2Float(item.getData()));
} else if (Objects.equals(item.getHarmName(),"Pq_ThdU")) {
harmRealDataSet.setData1(FloatUtils.get2Float(item.getData()));
} else {
String numberStr = item.getHarmName().substring(item.getHarmName().lastIndexOf('_') + 1);
String fieldName = "data" + numberStr;

View File

@@ -1,16 +1,10 @@
package com.njcn.zlevent.api;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.mq.message.CldLogMessage;
import com.njcn.zlevent.api.fallback.EventClientFallbackFactory;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

View File

@@ -11,10 +11,10 @@ import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -33,12 +33,19 @@ public class CsEventServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
@Transactional(rollbackFor = Exception.class)
public List<String> updateCsEvent(CsEventParam csEventParam) {
List<String> eventList = new ArrayList<>();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
LocalDateTime dateTime = LocalDateTime.parse(csEventParam.getStartTime(), formatter);
// 减去1毫秒
LocalDateTime newDateTime = dateTime.minusNanos(1000000);
String startTime = newDateTime.format(formatter);
//1.将波形文件关联事件
LambdaUpdateWrapper<CsEventPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId())
.eq(CsEventPO::getDeviceId,csEventParam.getDeviceId())
.in(CsEventPO::getType, Arrays.asList(0,1))
.between(CsEventPO::getStartTime,csEventParam.getStartTime(),csEventParam.getEndTime());
.between(CsEventPO::getStartTime,startTime,csEventParam.getEndTime());
if (Objects.nonNull(csEventParam.getLocation())) {
lambdaUpdateWrapper.eq(CsEventPO::getLocation, csEventParam.getLocation());
}

View File

@@ -0,0 +1,148 @@
package com.njcn.message.consumer;
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
import com.njcn.csdevice.param.IcdBzReplyParam;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.BzMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 类的介绍:接收前置响应台账更新相关信息
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:32
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.REPLY_TOPIC,
consumerGroup = "RECALL",
selectorExpression = "RECALL",
consumeMode = ConsumeMode.ORDERLY,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class BzConsumer extends EnhanceConsumerMessageHandler<BzMessage> implements RocketMQListener<BzMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Resource
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
@Override
public void handleMessage(BzMessage message) {
log.info("分发至补召响应处理程序");
//收到消息修改(cs_terminal_reply)
IcdBzReplyParam param = new IcdBzReplyParam();
param.setId(message.getGuid());
param.setDeviceId(message.getTerminalId());
param.setLineId(message.getMonitorId());
param.setCode(message.getCode());
param.setMsg(message.getResult());
if (param.getCode() == 200) {
param.setState(1);
} else {
param.setState(2);
}
csTerminalReplyFeignClient.updateBzData(param);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(BzMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(BzMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(BzMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(BzMessage message) {
super.dispatchMessage(message);
}
}

View File

@@ -12,6 +12,7 @@ import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@@ -31,6 +32,7 @@ import java.util.Objects;
topic = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
consumerGroup = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
consumeThreadNumber = 10,
consumeMode = ConsumeMode.ORDERLY,
enableMsgTrace = true
)
@Slf4j

View File

@@ -12,6 +12,7 @@ import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import com.njcn.zlevent.api.EventFeignClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@@ -31,6 +32,7 @@ import java.util.Objects;
topic = BusinessTopic.LOG_TOPIC,
consumerGroup = BusinessTopic.LOG_TOPIC,
consumeThreadNumber = 10,
consumeMode = ConsumeMode.ORDERLY,
enableMsgTrace = true
)
@Slf4j

View File

@@ -11,6 +11,7 @@ import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@@ -30,6 +31,7 @@ import java.util.Objects;
topic = BusinessTopic.HEART_BEAT_TOPIC,
consumerGroup = BusinessTopic.HEART_BEAT_TOPIC,
consumeThreadNumber = 10,
consumeMode = ConsumeMode.ORDERLY,
enableMsgTrace = true
)
@Slf4j

View File

@@ -47,6 +47,8 @@ public class RealDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDataM
@Override
protected void handleMessage(AppAutoDataMessage appAutoDataMessage) {
log.info("分发至实时数据");
String lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
redisUtil.saveByKeyWithExpire("devResponse:" + lineId,200,5L);
rtFeignClient.analysis(appAutoDataMessage);
}

View File

@@ -0,0 +1,155 @@
package com.njcn.message.consumer;
import cn.hutool.core.util.ObjectUtil;
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
import com.njcn.csdevice.param.IcdBzReplyParam;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.UpdateLedgerMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 类的介绍:接收前置响应台账更新相关信息
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:32
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.REPLY_TOPIC,
consumerGroup = "LEDGER",
selectorExpression = "LEDGER",
consumeMode = ConsumeMode.ORDERLY,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class UpdateLedgerConsumer extends EnhanceConsumerMessageHandler<UpdateLedgerMessage> implements RocketMQListener<UpdateLedgerMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Resource
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
@Override
public void handleMessage(UpdateLedgerMessage updateLedgerMessage) {
log.info("分发至更新台账响应处理程序");
//收到消息修改(cs_terminal_reply)
List<UpdateLedgerMessage.HandleData> data = updateLedgerMessage.getData();
if (ObjectUtil.isNotEmpty(data)) {
data.forEach(item->{
IcdBzReplyParam param = new IcdBzReplyParam();
param.setId(updateLedgerMessage.getGuid());
param.setDeviceId(item.getDeviceId());
param.setCode(item.getCode());
param.setMsg(item.getResult());
if (item.getCode() == 200) {
param.setState(1);
} else {
param.setState(2);
}
csTerminalReplyFeignClient.updateData(param);
});
}
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(UpdateLedgerMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(UpdateLedgerMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(UpdateLedgerMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(UpdateLedgerMessage message) {
super.dispatchMessage(message);
}
}

49
pom.xml
View File

@@ -31,16 +31,47 @@
<description>物联网交互模块</description>
<properties>
<!--中间件目标地址-->
<middle.server.url>192.168.1.22</middle.server.url>
<!--微服务模块发布地址-->
<service.server.url>127.0.0.1</service.server.url>
<!--docker仓库地址-->
<docker.server.url>192.168.1.22</docker.server.url>
<!--nacos的ip:port-->
<!--103本地-->
<!-- <middle.server.url>192.168.1.103</middle.server.url>-->
<!-- <service.server.url>192.168.1.126</service.server.url>-->
<!-- <docker.server.url>192.168.1.103</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace>72972c43-3c20-4452-a261-66624e17da97</nacos.namespace>-->
<!--103线上-->
<middle.server.url>192.168.1.103</middle.server.url>
<service.server.url>192.168.1.103</service.server.url>
<docker.server.url>192.168.1.103</docker.server.url>
<nacos.url>${middle.server.url}:18848</nacos.url>
<!--服务器发布内容为空-->
<nacos.namespace>415a1c87-33aa-47bd-8e25-13cc456c87ed</nacos.namespace>
<nacos.namespace></nacos.namespace>
<!-- <middle.server.url>192.168.1.22</middle.server.url>-->
<!-- <service.server.url>192.168.1.126</service.server.url>-->
<!-- <docker.server.url>192.168.1.22</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace>b0b0dedf-baa9-407f-bef6-988b9e0a640d</nacos.namespace>-->
<!--浙江现场-->
<!-- <middle.server.url>192.168.4.151</middle.server.url>-->
<!-- <service.server.url>192.168.4.151</service.server.url>-->
<!-- <docker.server.url>192.168.4.151</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace></nacos.namespace>-->
<!--102-->
<!-- <middle.server.url>192.168.1.102</middle.server.url>-->
<!-- <service.server.url>127.0.0.1</service.server.url>-->
<!-- <docker.server.url>192.168.1.102</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace>c208a65e-1578-4372-b7c0-97fecd323fe6</nacos.namespace>-->
<!-- <middle.server.url>192.168.1.27</middle.server.url>-->
<!-- <service.server.url>127.0.0.1</service.server.url>-->
<!-- <docker.server.url>192.168.1.27</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace></nacos.namespace>-->
<!--sentinel:port-->
<sentinel.url>${middle.server.url}:8080</sentinel.url>
<!--网关地址主要用于配置swagger中认证token-->