refactor(access): 优化设备接入服务的日志记录和性能

- 移除旧的字典树Feign客户端依赖,改为从Redis获取字典数据
- 添加ChannelObjectUtil和LogMessageTemplate用于数据转换和日志发送
- 在设备接入过程中增加成功和失败设备ID的统计功能
- 实现批量获取主题版本信息以提高性能
- 重构日志记录方式,统一使用LogMessageTemplate发送日志
- 优化设备注册和接入流程中的日志记录和错误处理
- 移除旧的CsLogsFeignClient相关代码并更新日志DTO结构
This commit is contained in:
xy
2026-06-11 19:54:41 +08:00
parent 1eb2db4f40
commit cfd22f3ea8
12 changed files with 488 additions and 332 deletions

View File

@@ -22,9 +22,11 @@ import com.njcn.access.pojo.dto.file.FileRedisDto;
import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.pojo.po.CsLineModel;
import com.njcn.access.pojo.po.CsTopic;
import com.njcn.access.service.*;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsLineModelService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.IHeartbeatService;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
@@ -36,10 +38,8 @@ import com.njcn.device.biz.utils.COverlimitUtil;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.mq.message.AppFileMessage;
import com.njcn.mq.template.AppAutoDataMessageTemplate;
import com.njcn.mq.template.AppEventMessageTemplate;
import com.njcn.mq.template.AppFileMessageTemplate;
import com.njcn.mq.template.AppFileStreamMessageTemplate;
import com.njcn.mq.message.LogMessage;
import com.njcn.mq.template.*;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.api.RtFeignClient;
@@ -86,10 +86,8 @@ public class MqttMessageHandler {
private final DataSetFeignClient dataSetFeignClient;
private final AppAutoDataMessageTemplate appAutoDataMessageTemplate;
private final AppEventMessageTemplate appEventMessageTemplate;
private final CsLogsFeignClient csLogsFeignClient;
private final AppFileMessageTemplate appFileMessageTemplate;
private final AppFileStreamMessageTemplate appFileStreamMessageTemplate;
private final ICsDeviceOnlineLogsService onlineLogsService;
private final CsSoftInfoFeignClient csSoftInfoFeignClient;
private final CsLineFeignClient csLineFeignClient;
private final DevCapacityFeignClient devCapacityFeignClient;
@@ -100,6 +98,7 @@ public class MqttMessageHandler {
private final RtFeignClient rtFeignClient;
private final CsCommunicateFeignClient csCommunicateFeignClient;
private final IHeartbeatService heartbeatService;
private final LogMessageTemplate logMessageTemplate;
@Autowired
Validator validator;
@@ -107,14 +106,9 @@ public class MqttMessageHandler {
@Transactional(rollbackFor = Exception.class)
public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
//日志记录
DeviceLogDTO logDto = new DeviceLogDTO();
try{
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
} catch (Exception e) {
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
}
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统");
logDto.setLoginName("系统");
logDto.setOperate(nDid + "设备主题录入");
logDto.setResult(1);
//业务流程开始
@@ -122,9 +116,9 @@ public class MqttMessageHandler {
ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
//检验传递的参数是否准确
Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
validate.forEach(constraintViolation -> {
System.out.println(constraintViolation.getMessage());
});
// validate.forEach(constraintViolation -> {
// System.out.println(constraintViolation.getMessage());
// });
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
if (Objects.equals(res.getType(), Integer.parseInt(TypeEnum.TYPE_16.getCode()))){
List<CsTopic> list = new ArrayList<>();
@@ -141,18 +135,18 @@ public class MqttMessageHandler {
list.add(csTopic);
});
csTopicService.addTopic(nDid,list);
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
} else {
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
logMessageTemplate.sendMember(logDto);
//log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
}
} else {
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.RESPONSE_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
logMessageTemplate.sendMember(logDto);
//log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
}
}
@@ -169,14 +163,9 @@ public class MqttMessageHandler {
public void devOperation(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
log.info("收到注册应答响应--->{}", nDid);
//日志记录
DeviceLogDTO logDto = new DeviceLogDTO();
try{
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
} catch (Exception e) {
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
}
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统");
logDto.setLoginName("系统");
logDto.setOperate("收到设备"+nDid+"注册应答响应");
logDto.setResult(1);
//业务处理
@@ -193,18 +182,18 @@ public class MqttMessageHandler {
reqAndResParam.setExpire(-1);
String version = csTopicService.getVersion(nDid);
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
} else {
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
logMessageTemplate.sendMember(logDto);
//log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
}
} else {
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
log.info(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage());
logMessageTemplate.sendMember(logDto);
//log.info(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage());
}
}
@@ -220,20 +209,15 @@ public class MqttMessageHandler {
@Transactional(rollbackFor = Exception.class)
public void devAccessOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
try{
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
} catch (Exception e) {
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
}
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统");
logDto.setLoginName("系统");
logDto.setResult(1);
//业务处理
Gson gson = new Gson();
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
redisUtil.saveByKeyWithExpire("devResponse",res.getCode(),5L);
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())) {
switch (res.getType()){
/**
* 装置类型模板应答
@@ -242,15 +226,17 @@ public class MqttMessageHandler {
* 3.平台端需读取装置的DevMod来判断网关支持的设备模板包含设备型号和模板版本根据app提交的接入子设备DID匹配数据模板型号及版本生成DevCfg下发给网关网关根据下发信息生成就地设备点表。
*/
case 4611:
log.info("{},装置模板应答,应答code {}",nDid,res.getCode());
logDto.setOperate("装置模板应答");
//log.info("{},装置模板应答,应答code {}",nDid,res.getCode());
ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class);
List<DevModInfoDto> list = modelDto.getMsg().getDevMod();
List<DevCfgDto> list2 = modelDto.getMsg().getDevCfg();
if (CollectionUtils.isEmpty(list)){
log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage());
if (CollectionUtils.isEmpty(list)) {
//log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage());
logDto.setOperate("查看装置端模板报文数据");
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
//有异常删除缓存的模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR);
@@ -267,11 +253,11 @@ public class MqttMessageHandler {
CsModelDto csModelDto = new CsModelDto();
CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData();
if (Objects.isNull(po)){
log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage());
logDto.setOperate(nDid + "模板缺失");
//log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage());
logDto.setOperate(nDid + "查询系统中是否存在模板");
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MODEL_NO_FIND.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
//有异常删除缓存的模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND);
@@ -279,9 +265,10 @@ public class MqttMessageHandler {
if (Objects.equals(po.getType(),0)){
List<CsDataSet> dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData();
if (CollectionUtils.isEmpty(dataSetList)){
logDto.setOperate("查看APF模块个数");
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
//有异常删除缓存的模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL);
@@ -302,10 +289,11 @@ public class MqttMessageHandler {
List<CsLineModel> lineList = csLineModelService.getMonitorNumByModelId(modelId);
String key = AppRedisKey.LINE + nDid;
redisUtil.saveByKeyWithExpire(key,lineList,600L);
logMessageTemplate.sendMember(logDto);
break;
case 4613:
logDto.setOperate(nDid + "设备接入");
log.info("{},收到接入应答响应,应答code {}",nDid,res.getCode());
//log.info("{},收到接入应答响应,应答code {}",nDid,res.getCode());
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
int mid = 1;
//修改装置状态
@@ -333,13 +321,13 @@ public class MqttMessageHandler {
//录波任务倒计时
redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L);
} else {
log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
//log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AccessResponseEnum.ACCESS_RESPONSE_ERROR);
}
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
break;
case 4614:
RspDataDto rspDataDto = JSON.parseObject(JSON.toJSONString(res.getMsg()), RspDataDto.class);
@@ -371,11 +359,13 @@ public class MqttMessageHandler {
csSoftInfoFeignClient.removeSoftInfo(soft);
}
equipmentFeignClient.updateSoftInfo(nDid,csSoftInfoPo.getId());
logMessageTemplate.sendMember(logDto);
break;
case 2:
List<RspDataDto.LdevInfo> devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class);
if (CollectionUtil.isNotEmpty(devInfo)){
if (Objects.equals(res.getDid(),1)){
logDto.setOperate(nDid + "更新APF容量");
List<CsDevCapacityPO> list3 = new ArrayList<>();
boolean hasZeroClDid = devInfo.stream().anyMatch(item -> item.getClDid() == 0);
//治理设备
@@ -411,19 +401,23 @@ public class MqttMessageHandler {
});
}
}
logMessageTemplate.sendMember(logDto);
break;
case 15:
logDto.setOperate(nDid + "更新设备软件信息");
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(res));
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class);
appAutoDataMessage.setId(nDid);
rtFeignClient.apfRtAnalysis(appAutoDataMessage);
logMessageTemplate.sendMember(logDto);
break;
case 48:
logDto.setUserName("运维管理员");
logDto.setUserIndex("系统");
logDto.setOperate("监测点:" + (nDid + rspDataDto.getClDid()) + "询问项目列表");
List<RspDataDto.ProjectInfo> projectInfoList = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.ProjectInfo.class);
String key3 = AppRedisKey.PROJECT_INFO + nDid + rspDataDto.getClDid();
redisUtil.saveByKeyWithExpire(key3,projectInfoList,60L);
logMessageTemplate.sendMember(logDto);
break;
default:
break;
@@ -441,10 +435,11 @@ public class MqttMessageHandler {
}
} else {
String result = getEnum(res.getCode());
log.info(result);
//log.info(result);
logDto.setOperate("装置响应");
logDto.setResult(0);
logDto.setFailReason(result);
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(result);
}
}

View File

@@ -3,10 +3,7 @@ package com.njcn.access.listener;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.utils.RedisSetUtil;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.csdevice.api.*;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.pojo.dto.BaseRealDataSet;
import lombok.extern.slf4j.Slf4j;
@@ -27,27 +24,12 @@ import java.util.Set;
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Resource
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Resource
private CsLogsFeignClient csLogsFeignClient;
@Resource
private CsLedgerFeignClient csLedgerFeignclient;
@Resource
private EquipmentFeignClient equipmentFeignClient;
@Resource
private RedisUtil redisUtil;
@Resource
private SendMessageUtil sendMessageUtil;
@Resource
private CsCommunicateFeignClient csCommunicateFeignClient;
@Resource
private MqttPublisher publisher;
@Resource
private RedisSetUtil redisSetUtil;
@Resource
private DeviceMessageFeignClient deviceMessageFeignClient;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);

View File

@@ -4,11 +4,14 @@ import cn.hutool.core.collection.CollUtil;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.mq.message.LogMessage;
import com.njcn.mq.template.LogMessageTemplate;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.SysDicTreePO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
@@ -17,8 +20,10 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
@@ -36,7 +41,8 @@ public class AccessApplicationRunner implements ApplicationRunner {
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
private final ICsTopicService csTopicService;
private final CsDeviceServiceImpl csDeviceService;
private final DictTreeFeignClient dictTreeFeignClient;
private final LogMessageTemplate logMessageTemplate;
private final ChannelObjectUtil channelObjectUtil;
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static final long ACCESS_TIME = 60L;
@@ -48,6 +54,13 @@ public class AccessApplicationRunner implements ApplicationRunner {
log.info("系统重启,所有符合条件的装置发起接入!");
List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getOnlineDev();
if (CollUtil.isNotEmpty(list)) {
//获取字典数据
List<SysDicTreePO> dictTreeKey = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE), SysDicTreePO.class);
Map<String, SysDicTreePO> dictTreeMap = dictTreeKey.stream().collect(Collectors.toMap(SysDicTreePO::getId, item -> item));
//获取主题版本信息
List<String> nDidIds = list.stream().map(CsEquipmentDeliveryPO::getNdid).collect(Collectors.toList());
Map<String,String> topicVersions = csTopicService.getVersion(nDidIds);
ExecutorService executor = Executors.newFixedThreadPool(10);
// 将任务平均分配给10个子列表
List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>();
@@ -65,7 +78,7 @@ public class AccessApplicationRunner implements ApplicationRunner {
futures.add(executor.submit(new Callable<Void>() {
@Override
public Void call() {
accessDev(subLists.get(index));
accessDev(subLists.get(index), dictTreeMap, topicVersions);
return null;
}
}));
@@ -86,25 +99,48 @@ public class AccessApplicationRunner implements ApplicationRunner {
scheduler.shutdown();
}
public void accessDev(List<CsEquipmentDeliveryPO> list) {
public void accessDev(List<CsEquipmentDeliveryPO> list, Map<String, SysDicTreePO> dictTreeMap, Map<String,String> topicVersions) {
if (CollUtil.isNotEmpty(list)) {
List<String> successIds = new ArrayList<>();
List<String> failIds = new ArrayList<>();
try {
list.forEach(item->{
//System.out.println(Thread.currentThread().getName() + ": reboot : nDid : " + item.getNdid());
//判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) {
if (Objects.equals(dictTreeMap.get(item.getDevType()).getCode(), DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) {
//csDeviceService.wlDevRegister(item.getNdid());
log.info("请先手动注册、接入");
} else {
String version = csTopicService.getVersion(item.getNdid());
String version = topicVersions.get(item.getNdid());
if (Objects.isNull(version)) {
version = "V1";
}
csDeviceService.autoAccess(item.getNdid(),version,1);
boolean result = csDeviceService.autoAccess(item.getNdid(),version,1);
if (result) {
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
successIds.add(item.getNdid());
} else {
failIds.add(item.getNdid());
}
}
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1);
});
if (CollUtil.isNotEmpty(successIds)) {
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统首次启动");
logDto.setLoginName("系统首次启动");
logDto.setResult(1);
logDto.setOperate(String.join(",", successIds) + "装置接入");
logMessageTemplate.sendMember(logDto);
}
if (CollUtil.isNotEmpty(failIds)) {
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统首次启动");
logDto.setLoginName("系统首次启动");
logDto.setResult(0);
logDto.setOperate(String.join(",", failIds) + "装置接入");
logDto.setFailReason("装置不在线或者系统询问装置模板信息,装置未响应");
logMessageTemplate.sendMember(logDto);
}
} catch (Exception e) {
log.error(e.getMessage());
}

View File

@@ -4,11 +4,14 @@ import cn.hutool.core.collection.CollUtil;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.mq.message.LogMessage;
import com.njcn.mq.template.LogMessageTemplate;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.SysDicTreePO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
@@ -17,8 +20,10 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author xy
@@ -34,7 +39,8 @@ public class AutoAccessTimer implements ApplicationRunner {
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
private final ICsTopicService csTopicService;
private final CsDeviceServiceImpl csDeviceService;
private final DictTreeFeignClient dictTreeFeignClient;
private final ChannelObjectUtil channelObjectUtil;
private final LogMessageTemplate logMessageTemplate;
@Override
public void run(ApplicationArguments args) {
@@ -87,6 +93,13 @@ public class AutoAccessTimer implements ApplicationRunner {
log.info("轮询定时任务执行中!");
List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getOfflineDev();
if (CollUtil.isNotEmpty(list)) {
//获取字典数据
List<SysDicTreePO> dictTreeKey = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE), SysDicTreePO.class);
Map<String, SysDicTreePO> dictTreeMap = dictTreeKey.stream().collect(Collectors.toMap(SysDicTreePO::getId, item -> item));
//获取主题版本信息
List<String> nDidIds = list.stream().map(CsEquipmentDeliveryPO::getNdid).collect(Collectors.toList());
Map<String,String> topicVersions = csTopicService.getVersion(nDidIds);
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
List<List<CsEquipmentDeliveryPO>> subLists = CollUtil.split(list, 10);
@@ -94,7 +107,7 @@ public class AutoAccessTimer implements ApplicationRunner {
for (List<CsEquipmentDeliveryPO> subList : subLists) {
futures.add(executor.submit(() -> {
try {
accessDevSafely(subList); // 使用安全版本
accessDevSafely(subList,dictTreeMap,topicVersions);
} catch (Exception e) {
log.error("处理设备子列表异常", e);
}
@@ -125,40 +138,55 @@ public class AutoAccessTimer implements ApplicationRunner {
}
//安全的accessDev版本
private void accessDevSafely(List<CsEquipmentDeliveryPO> list) {
private void accessDevSafely(List<CsEquipmentDeliveryPO> list, Map<String, SysDicTreePO> dictTreeMap, Map<String,String> topicVersions) {
if (CollUtil.isNotEmpty(list)) {
List<String> successIds = new ArrayList<>();
List<String> failIds = new ArrayList<>();
for (CsEquipmentDeliveryPO item : list) {
try {
processSingleDevice(item);
if (Objects.equals(dictTreeMap.get(item.getDevType()).getCode(), DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) {
log.info("设备 {} 需要手动注册、接入", item.getNdid());
} else {
String version = topicVersions.get(item.getNdid());
if (Objects.isNull(version)) {
version = "V1";
}
// 使用try-catch确保单个设备失败不影响其他设备
try {
boolean success = csDeviceService.autoAccess2(item.getNdid(), version, 1);
if (success) {
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
successIds.add(item.getNdid());
} else {
log.warn("设备 {} 接入失败", item.getNdid());
failIds.add(item.getNdid());
}
} catch (Exception e) {
log.error("设备 {} 接入异常: {}", item.getNdid(), e.getMessage());
}
}
} catch (Exception e) {
log.error("处理设备 {} 失败: {}", item.getNdid(), e.getMessage());
}
}
}
}
private void processSingleDevice(CsEquipmentDeliveryPO item) {
//System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid());
String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) {
log.info("设备 {} 需要手动注册、接入", item.getNdid());
} else {
String version = csTopicService.getVersion(item.getNdid());
if (Objects.isNull(version)) {
version = "V1";
if (CollUtil.isNotEmpty(successIds)) {
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统首次启动");
logDto.setLoginName("系统首次启动");
logDto.setResult(1);
logDto.setOperate(String.join(",", successIds) + "装置接入");
logMessageTemplate.sendMember(logDto);
}
// 使用try-catch确保单个设备失败不影响其他设备
try {
boolean success = csDeviceService.autoAccess2(item.getNdid(), version, 1);
if (success) {
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1);
} else {
log.warn("设备 {} 接入失败", item.getNdid());
}
} catch (Exception e) {
log.error("设备 {} 接入异常: {}", item.getNdid(), e.getMessage());
if (CollUtil.isNotEmpty(failIds)) {
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统首次启动");
logDto.setLoginName("系统首次启动");
logDto.setResult(0);
logDto.setOperate(String.join(",", failIds) + "装置接入");
logDto.setFailReason("装置不在线或者系统询问装置模板信息,装置未响应");
logMessageTemplate.sendMember(logDto);
}
}
}
}
}

View File

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.access.pojo.po.CsTopic;
import java.util.List;
import java.util.Map;
/**
* <p>
@@ -28,5 +29,7 @@ public interface ICsTopicService extends IService<CsTopic> {
*/
String getVersion(String nDid);
Map<String,String> getVersion(List<String> list);
void deleteByNDid(String nDid);
}

View File

@@ -22,13 +22,18 @@ import com.njcn.access.utils.CRC32Utils;
import com.njcn.access.utils.JsonUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DevModelFeignClient;
import com.njcn.csdevice.api.DevModelRelationFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csdevice.pojo.param.CsDevModelAddParm;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDataSet;
import com.njcn.csdevice.pojo.po.CsDevModelPO;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.mq.message.LogMessage;
import com.njcn.mq.template.LogMessageTemplate;
import com.njcn.oss.constant.OssPath;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.redis.pojo.enums.AppRedisKey;
@@ -74,7 +79,6 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
private final ICsLineModelService csLineModelService;
private final ICsGroupService csGroupService;
private final ICsGroArrService csGroArrService;
private final CsLogsFeignClient csLogsFeignClient;
private final EleWaveFeignClient waveFeignClient;
private final DictTreeFeignClient dictTreeFeignClient;
private final MqttPublisher publisher;
@@ -83,13 +87,14 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
private final EquipmentFeignClient eequipmentFeignClient;
private final DevModelRelationFeignClient devModelRelationFeignClient;
private final CsLineFeignClient csLineFeignClient;
private final LogMessageTemplate logMessageTemplate;
@Override
@Transactional(rollbackFor = {Exception.class})
public void addModel(MultipartFile file) {
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setResult(1);
String json = null;
@@ -139,11 +144,11 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
}
//5.清空模板缓存
redisUtil.deleteKeysByString(AppRedisKey.DEV_MODEL);
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MODEL_ANALYSIS_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(e.getMessage());
}
}
@@ -152,8 +157,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
@Transactional(rollbackFor = {Exception.class})
public void addDict(MultipartFile file) {
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setResult(1);
String json = null;
@@ -163,11 +168,11 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
TemplateDto templateDto = gson.fromJson(json, TemplateDto.class);
logDto.setOperate(templateDto.getDevType() + "录入通用字典");
analysisDict(templateDto);
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.DICT_ANALYSIS_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AccessResponseEnum.DICT_ANALYSIS_ERROR);
}
}
@@ -201,7 +206,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
//发送数据给前端
String json = "{fileName:"+file.getOriginalFilename()+",allStep:"+times+",nowStep:"+i+"}";
publisher.send("/Web/Progress/" + id, new Gson().toJson(json), 1, false);
DeviceLogDTO logDto = new DeviceLogDTO();
LogMessage logDto = new LogMessage();
byte[] lsBytes;
if (length > 50*1024) {
lsBytes = Arrays.copyOfRange(bytes, (i - 1) * cap, i * cap);
@@ -227,25 +232,26 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
//判断是否重发
sendNextStep(logDto,path,file,bytes.length,lsBytes,(i-1)*cap,version,id,i,hexString,true);
}
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
}
} else {
String json = "{fileName:"+file.getOriginalFilename()+",allStep:\""+1+"\",nowStep:"+1+"}";
publisher.send("/Web/Progress", new Gson().toJson(json), 1, false);
ReqAndResDto.Req req = getPojo(1,path,file,length,bytes,0,hexString);
publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false);
DeviceLogDTO logDto = new DeviceLogDTO();
LogMessage logDto = new LogMessage();
logDto.setOperate(id + "系统上送文件,当前文件只有1帧");
logDto.setResult(1);
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
//判断是否重发
sendNextStep(logDto,path,file,length,bytes,0,version,id,1,hexString,false);
}
} catch (Exception e) {
DeviceLogDTO logDto = new DeviceLogDTO();
LogMessage logDto = new LogMessage();
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.UPLOAD_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
logDto.setOperate("系统上传文件");
logDto.setFailReason(e.getMessage());
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AccessResponseEnum.UPLOAD_ERROR);
}
}
@@ -279,7 +285,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
/**
* 根据装置响应来判断发送的内容
*/
public void sendNextStep(DeviceLogDTO logDto, String path, MultipartFile file, int length, byte[] bytes, Integer offset, String version, String id, int mid, String fileCheck, boolean result) {
public void sendNextStep(LogMessage logDto, String path, MultipartFile file, int length, byte[] bytes, Integer offset, String version, String id, int mid, String fileCheck, boolean result) {
try {
for (int i = 0; i < 30; i++) {
if (result) {
@@ -303,16 +309,17 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false);
logDto.setOperate(id + "系统上送文件,装置响应失败,重新发送,这是第" + (i+1) + "");
logDto.setResult(1);
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
}
}
}
} catch (InterruptedException e) {
assert logDto != null;
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.RELOAD_UPLOAD_ERROR.getMessage());
csLogsFeignClient.addUserLog(logDto);
throw new RuntimeException(e);
logDto.setOperate("平台重新上送文件");
logDto.setFailReason(e.getMessage());
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AccessResponseEnum.RELOAD_UPLOAD_ERROR);
}
}
@@ -322,8 +329,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
*/
private CsDevModelPO addCsDevModel(TemplateDto templateDto, String filePath){
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("新增"+templateDto.getDevType()+"模板数据");
logDto.setResult(1);
@@ -331,23 +338,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
if (!Objects.isNull(po)){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MODEL_REPEAT.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AccessResponseEnum.MODEL_REPEAT);
}
// CsDevModelPO model = new CsDevModelPO();
// model.setDevTypeName(templateDto.getDevType());
// model.setName(templateDto.getDevType());
// model.setVersionNo(templateDto.getVersion());
// model.setVersionDate(Date.valueOf(templateDto.getTime()));
// model.setFilePath(filePath);
// model.setStatus ("1");
// //fixme 先用数据类型来区分模板的类型
// if (templateDto.getDataList().contains("Apf") || templateDto.getDataList().contains("Dvr")){
// model.setType(0);
// } else {
// model.setType(1);
// }
// csDevModelMapper.insert(model);
CsDevModelAddParm csDevModelAddParm = new CsDevModelAddParm();
csDevModelAddParm.setDevTypeName(templateDto.getDevType());
csDevModelAddParm.setName(templateDto.getDevType());
@@ -361,7 +354,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
csDevModelAddParm.setType(1);
}
CsDevModelPO model = devModelFeignClient.addDevModel(csDevModelAddParm).getData();
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
return model;
}

View File

@@ -27,6 +27,8 @@ import com.njcn.csdevice.pojo.param.*;
import com.njcn.csdevice.pojo.po.*;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.csharmonic.api.CsHarmonicPlanLineFeignClient;
import com.njcn.mq.message.LogMessage;
import com.njcn.mq.template.LogMessageTemplate;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DicDataFeignClient;
@@ -75,7 +77,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
private final MqttUtil mqttUtil;
private final ICsTopicService csTopicService;
private final DicDataFeignClient dicDataFeignClient;
private final CsLogsFeignClient csLogsFeignClient;
private final ProcessFeignClient processFeignClient;
private final CsLinePOService csLinePOService;
private final CsDeviceUserPOService csDeviceUserPOService;
@@ -89,56 +90,61 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
private final AppProjectFeignClient appProjectFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private final CsHarmonicPlanLineFeignClient csHarmonicPlanLineFeignClient;
private final LogMessageTemplate logMessageTemplate;
@Override
@Transactional(rollbackFor = {Exception.class})
public void devRegister(String nDid,Integer type) {
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("直连设备"+nDid+"注册");
logDto.setResult(1);
LogMessage message = new LogMessage();
message.setUserIndex(RequestUtil.getUserNickname());
message.setLoginName(RequestUtil.getUsername());
message.setResult(1);
//1.判断客户端是否在线
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (!mqttClient){
message.setOperate("直连装置"+nDid+"注册中,判断装置是否连接MQTT服务器");
message.setResult(0);
message.setFailReason(AlgorithmResponseEnum.DEV_OFFLINE.getMessage());
logMessageTemplate.sendMember(message);
throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE);
}
//1.判断nDid是否存在
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);
if (Objects.isNull(csEquipmentDeliveryVO.getNdid())){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.NDID_NO_FIND.getMessage());
csLogsFeignClient.addUserLog(logDto);
message.setOperate("直连装置"+nDid+"注册中,判断nDid是否存在");
message.setResult(0);
message.setFailReason(AccessResponseEnum.NDID_NO_FIND.getMessage());
logMessageTemplate.sendMember(message);
throw new BusinessException(AccessResponseEnum.NDID_NO_FIND);
}
//2.判断设备是否是直连设备
//2.判断装置是否是直连装置
SysDicTreePO sysDicTreePo = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData();
if (Objects.isNull(sysDicTreePo)){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.DEV_NOT_FIND.getMessage());
csLogsFeignClient.addUserLog(logDto);
message.setOperate("直连装置"+nDid+"注册中,判断装置型号");
message.setResult(0);
message.setFailReason(AccessResponseEnum.DEV_NOT_FIND.getMessage());
logMessageTemplate.sendMember(message);
throw new BusinessException(AccessResponseEnum.DEV_NOT_FIND);
}
String code = sysDicTreePo.getCode();
if (!Objects.equals(code, DicDataEnum.CONNECT_DEV.getCode())){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.DEV_IS_NOT_ZL.getMessage());
csLogsFeignClient.addUserLog(logDto);
message.setOperate("直连装置"+nDid+"注册中,判断装置是否为直连装置");
message.setResult(0);
message.setFailReason(AccessResponseEnum.DEV_IS_NOT_ZL.getMessage());
logMessageTemplate.sendMember(message);
throw new BusinessException(AccessResponseEnum.DEV_IS_NOT_ZL);
}
//3.判断是否已经注册过
if (!Objects.isNull(csEquipmentDeliveryVO.getNdid()) && Objects.equals(type,csEquipmentDeliveryVO.getProcess()) && Objects.equals(AccessEnum.ACCESS.getCode(),csEquipmentDeliveryVO.getStatus())){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.NDID_SAME_STEP.getMessage());
csLogsFeignClient.addUserLog(logDto);
message.setOperate("直连装置"+nDid+"注册中,判断装置是否已经注册");
message.setResult(0);
message.setFailReason(AccessResponseEnum.NDID_SAME_STEP.getMessage());
logMessageTemplate.sendMember(message);
throw new BusinessException(AccessResponseEnum.NDID_SAME_STEP);
}
//4.判断客户端是否在线
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (!mqttClient){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MISSING_CLIENT.getMessage());
csLogsFeignClient.addUserLog(logDto);
throw new BusinessException(AccessResponseEnum.MISSING_CLIENT);
}
//5.判断当前流程是否是合法的
//4.判断当前流程是否是合法的
//note(重要说明) 这边流程原先是三个阶段,在实际应用中嫌麻烦,简化为一个流程
// if (csEquipmentDeliveryVO.getProcess() > type){
// logDto.setResult(0);
@@ -149,30 +155,32 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
// logDto.setFailReason(AccessResponseEnum.PROCESS_MISSING_ERROR.getMessage());
// throw new BusinessException(AccessResponseEnum.PROCESS_MISSING_ERROR);
// }
//6.询问设备支持的主题信息
//5.询问装置支持的主题信息
//将支持的主题入库
askTopic(nDid);
//7.MQTT询问装置用的模板并判断库中是否存在模板
//存在则建立关系;不存在则告警出来
SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData();
if (Objects.isNull(dictData)){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.DEV_MODEL_NOT_FIND.getMessage());
csLogsFeignClient.addUserLog(logDto);
message.setOperate("直连装置"+nDid+"注册中,判断系统中是否存在装置型号");
message.setResult(0);
message.setFailReason(AccessResponseEnum.DEV_MODEL_NOT_FIND.getMessage());
logMessageTemplate.sendMember(message);
throw new BusinessException(AccessResponseEnum.DEV_MODEL_NOT_FIND);
}
String devModel = dictData.getCode();
zhiLianRegister(nDid,devModel);
csLogsFeignClient.addUserLog(logDto);
message.setOperate("直连装置"+nDid+"下发注册报文");
logMessageTemplate.sendMember(message);
}
@Override
public Object getModel(String nDid) {
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("获取"+nDid+"设备模板信息");
logDto.setOperate("获取"+nDid+"装置模板信息");
logDto.setResult(1);
Object model = null;
try {
@@ -182,13 +190,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
if (Objects.isNull(model)){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.MODEL_MISS.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AccessResponseEnum.MODEL_MISS);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
return model;
}
@@ -197,10 +205,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//fixme 这边事务不起作用,中途出错会导致数据部分录入,再次接入会报主键冲突,所以暂时加了个重置按钮,清空台账数据的
public void devAccess(DevAccessParam devAccessParam) {
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("设备"+devAccessParam.getNDid()+"接入");
logDto.setOperate("装置"+devAccessParam.getNDid()+"接入");
logDto.setResult(1);
try {
//获取版本
@@ -217,7 +225,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csLedgerParam.setSort(0);
csLedgerService.addLedgerTree(csLedgerParam);
List<CsModelDto> modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + devAccessParam.getNDid()),CsModelDto.class);
//2.新增装置-模板关系、获取电能质量的逻辑设备id
//2.新增装置-模板关系、获取电能质量的逻辑装置id
for (CsModelDto item : modelId) {
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
csDevModelRelationAddParm.setDevId(vo.getId());
@@ -287,7 +295,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
if (CollectionUtil.isNotEmpty(lineList)){
logDto.setResult(0);
logDto.setFailReason(AccessResponseEnum.LINE_POSITION_REPEAT.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AccessResponseEnum.LINE_POSITION_REPEAT);
}
//删除监测点稳态指标告警的默认指标配置
@@ -310,16 +318,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
po.setSubUserId(RequestUtil.getUserIndex());
po.setDeviceId(vo.getId());
csDeviceUserService.saveBatch(Collections.singletonList(po));
//6.修改装置状态;修改设备接入的工程、项目
//6.修改装置状态;修改装置接入的工程、项目
csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode(),devAccessParam.getEngineeringId(), devAccessParam.getProjectId());
//7.发起自动接入请求
devAccessAskTemplate(devAccessParam.getNDid(),version,1);
//8.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + devAccessParam.getNDid());
redisUtil.delete(AppRedisKey.LINE + devAccessParam.getNDid());
//9.存储日志
csLogsFeignClient.addUserLog(logDto);
//10.存储设备调试日志表
//9.存储装置调试日志
CsEquipmentProcessPO csEquipmentProcess = new CsEquipmentProcessPO();
csEquipmentProcess.setDevId(devAccessParam.getNDid());
csEquipmentProcess.setOperator(RequestUtil.getUserIndex());
@@ -330,7 +336,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csEquipmentProcess.setStatus(1);
}
processFeignClient.add(csEquipmentProcess);
//11.这里会出现工程用户接入设备时,如果当前工程用户并没有关注,接入之后应该将用户和工程关联起来
//11.这里会出现工程用户接入装置时,如果当前工程用户并没有关注,接入之后应该将用户和工程关联起来
List<UserVO> users = userFeignClient.getUserVOByIdList(Collections.singletonList(RequestUtil.getUserIndex())).getData();
if (CollectionUtil.isNotEmpty(users)) {
UserVO userVO = users.get(0);
@@ -338,10 +344,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csMarketDataFeignClient.insertData(userVO.getId(), devAccessParam.getEngineeringId());
}
}
//10.存储日志
logMessageTemplate.sendMember(logDto);
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(CommonResponseEnum.FAIL);
}
}
@@ -367,7 +375,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//清除关系表
QueryWrapper<CsLedger> csLedgerQueryWrapper = new QueryWrapper<>();
/**
* 删除设备
* 删除装置
*/
csLedgerQueryWrapper.clear();
csLedgerQueryWrapper.eq("id",devId);
@@ -412,16 +420,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
@Transactional(rollbackFor = Exception.class)
public String wlDevRegister(String nDid) {
String result = "fail";
// 根据模板接入设备
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
// 根据模板接入装置
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("便携式设备"+nDid+"注册、接入");
logDto.setOperate("便携式装置"+nDid+"注册、接入");
logDto.setResult(1);
try {
// 设备状态判断
// 装置状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
// 询问装置支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
Thread.sleep(2000);
// MQTT询问装置用的模板并判断库中是否存在模板
@@ -430,10 +438,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
List<CsLinePO> csLinePoList = new ArrayList<>();
//1.录入装置台账信息
//note 1、这边发现便携式设备注册时,如果没有工程 项目,后期特殊处理非常的麻烦,这边接入时,先查询工程 项目,如果没有则创建;如果存在则直接使用;
//note 2、查询之前已经接入过的便携式设备,如果存在修改台账信息,添加工程、项目
//note 1、这边发现便携式装置注册时,如果没有工程 项目,后期特殊处理非常的麻烦,这边接入时,先查询工程 项目,如果没有则创建;如果存在则直接使用;
//note 2、查询之前已经接入过的便携式装置,如果存在修改台账信息,添加工程、项目
String projectId = this.autoPortableLedger();
//新增便携式设备
//新增便携式装置
CsLedgerParam csLedgerParam = new CsLedgerParam();
csLedgerParam.setId(vo.getId());
csLedgerParam.setPid(projectId);
@@ -491,11 +499,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//String version = csTopicService.getVersion(nDid);
String version = "V1";
devAccessAskTemplate(nDid,version,1);
//6.修改流程,便携式设备接入成功即为实际环境
//6.修改流程,便携式装置接入成功即为实际环境
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);
//7.存储日志
csLogsFeignClient.addUserLog(logDto);
//9.删除redis监测点模板信息
//7.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.delete(AppRedisKey.LINE + nDid);
//判断接入状态
@@ -504,10 +510,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
if (Objects.nonNull(object)) {
result = "success";
}
//8.存储日志
logMessageTemplate.sendMember(logDto);
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
resetFactory(nDid);
throw new BusinessException(AccessResponseEnum.ACCESS_ERROR);
}
@@ -516,7 +524,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
@Override
public void wlAccess(String nDid) {
//设备状态判断
//装置状态判断
checkDeviceStatus(nDid);
//获取版本
String version = csTopicService.getVersion(nDid);
@@ -550,7 +558,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
param.setSort(Integer.MAX_VALUE);
csProjectPO = appProjectFeignClient.addPortableProject(param).getData();
}
//修改已存在的便携式设备
//修改已存在的便携式装置
csLedgerService.updatePortableLedger(csEngineeringPO.getId(),csProjectPO.getId());
return csProjectPO.getId();
}
@@ -559,16 +567,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
@Transactional(rollbackFor = Exception.class)
public String onlineRegister(String projectId,String nDid) {
String result = "fail";
// 根据模板接入设备
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
// 根据模板接入装置
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("监测设备"+nDid+"注册、接入");
logDto.setOperate("监测装置"+nDid+"注册、接入");
logDto.setResult(1);
try {
// 设备状态判断
// 装置状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
// 询问装置支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
Thread.sleep(2000);
// MQTT询问装置用的模板并判断库中是否存在模板
@@ -577,7 +585,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
List<CsLinePO> csLinePoList = new ArrayList<>();
//1.录入装置台账信息
//新增监测设备
//新增监测装置
CsLedgerParam csLedgerParam = new CsLedgerParam();
csLedgerParam.setId(vo.getId());
csLedgerParam.setPid(projectId);
@@ -646,9 +654,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
devAccessAskTemplate(nDid,version,1);
//6.修改流程,接入成功即为实际环境
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);
//7.存储日志
csLogsFeignClient.addUserLog(logDto);
//9.删除redis监测点模板信息
//7.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.delete(AppRedisKey.LINE + nDid);
//判断接入状态
@@ -657,10 +663,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
if (Objects.nonNull(object)) {
result = "success";
}
//8.存储日志
logMessageTemplate.sendMember(logDto);
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
resetFactory(nDid);
throw new BusinessException(e.getMessage());
}
@@ -668,7 +676,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
private void checkDeviceStatus(String nDid) {
DeviceLogDTO logDto = createLogDto("当前设备"+nDid+"状态判断");
LogMessage logDto = createLogDto("当前装置"+nDid+"状态判断");
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);
if (Objects.isNull(csEquipmentDeliveryVO.getNdid())) {
throwExceptionAndLog(nDid,AccessResponseEnum.NDID_NO_FIND, logDto);
@@ -687,12 +695,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
private void askAndStoreTopics(String nDid) {
// 询问设备支持的主题信息
// 询问装置支持的主题信息
this.askTopic(nDid);
}
private void checkDeviceModel(String nDid) {
DeviceLogDTO logDto = createLogDto("MQTT询问装置用的模板并判断库中是否存在模板");
LogMessage logDto = createLogDto("MQTT询问装置用的模板并判断库中是否存在模板");
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);
SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData();
if (Objects.isNull(dictData)) {
@@ -702,19 +710,19 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
zhiLianRegister(nDid,devModel);
}
private DeviceLogDTO createLogDto(String operate) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
private LogMessage createLogDto(String operate) {
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统");
logDto.setLoginName("系统");
logDto.setOperate(operate);
logDto.setResult(1);
return logDto;
}
private void throwExceptionAndLog(String nDid,AccessResponseEnum responseEnum, DeviceLogDTO logDto) {
private void throwExceptionAndLog(String nDid,AccessResponseEnum responseEnum, LogMessage logDto) {
logDto.setResult(0);
logDto.setFailReason(responseEnum.getMessage());
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
resetFactory(nDid);
throw new BusinessException(responseEnum);
}
@@ -728,13 +736,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
public boolean devAccessAskTemplate(String nDid,String version,Integer mid) {
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统");
logDto.setLoginName("系统");
if (!mqttClient) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "接入失败,装置客户端不在线");
csLogsFeignClient.addUserLog(logDto);
logDto.setResult(0);
logDto.setOperate(nDid + "系统向装置询问模板信息");
logDto.setFailReason(AlgorithmResponseEnum.DEV_OFFLINE.getMessage());
logMessageTemplate.sendMember(logDto);
throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE);
}
boolean result = false;
@@ -778,14 +787,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//录波任务倒计时
redisUtil.saveByKeyWithExpire("startFile:" + nDid,null,60L);
result = true;
} else {
logDto.setResult(0);
logDto.setOperate(nDid + "系统向装置询问模板信息");
logDto.setFailReason("装置未应答");
logMessageTemplate.sendMember(logDto);
}
} catch (Exception e) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setResult(0);
logDto.setOperate(nDid + "装置接入失败");
csLogsFeignClient.addUserLog(logDto);
logMessageTemplate.sendMember(logDto);
throw new BusinessException(e.getMessage());
}
return result;
@@ -803,13 +814,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (!mqttClient) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "接入失败,装置客户端不在线");
csLogsFeignClient.addUserLog(logDto);
throw new BusinessException(AlgorithmResponseEnum.DEV_OFFLINE);
return result;
}
Map<Integer,String> modelMap = new HashMap<>();
//删除缓存数据
@@ -852,13 +857,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
result = true;
}
} catch (Exception e) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "装置接入失败");
csLogsFeignClient.addUserLog(logDto);
throw new BusinessException(e.getMessage());
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统首次启动");
logDto.setLoginName("系统首次启动");
logDto.setResult(0);
logDto.setOperate(nDid + "装置接入");
logDto.setFailReason(e.getMessage());
logMessageTemplate.sendMember(logDto);
}
return result;
}
@@ -870,14 +875,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (!mqttClient) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "接入失败,装置客户端不在线");
csLogsFeignClient.addUserLog(logDto);
// 改为返回false而不是抛出异常
log.warn("设备 {} 客户端不在线", nDid);
//log.warn("装置 {} 客户端不在线", nDid);
return false;
}
@@ -889,12 +887,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("线程休眠被中断: {}", e.getMessage());
//log.warn("线程休眠被中断: {}", e.getMessage());
return false;
}
List<CsModelDto> modelId = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid), CsModelDto.class);
if (CollUtil.isEmpty(modelId)) {
log.warn("设备 {} 未获取到模板信息", nDid);
//log.warn("装置 {} 未获取到模板信息", nDid);
return false;
}
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
@@ -916,13 +914,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())), 1, false);
result = true;
} catch (Exception e) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(nDid + "装置接入失败");
csLogsFeignClient.addUserLog(logDto);
log.error("设备 {} 接入失败: {}", nDid, e.getMessage());
LogMessage logDto = new LogMessage();
logDto.setUserIndex("定时任务");
logDto.setLoginName("定时任务");
logDto.setResult(0);
logDto.setOperate(nDid + "装置接入");
logDto.setFailReason(e.getMessage());
logMessageTemplate.sendMember(logDto);
//log.error("装置 {} 接入失败: {}", nDid, e.getMessage());
}
return result;
}
@@ -953,7 +952,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
/**
* 平台对设备发起主题询问命令
* 平台对装置发起主题询问命令
*/
public void askTopic(String nDid) {
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
@@ -967,7 +966,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
/**
* 平台对设备发起注册命令
* 平台对装置发起注册命令
* @param nDid
* @param devType
*/

View File

@@ -9,15 +9,16 @@ import com.njcn.access.mapper.CsEquipmentDeliveryMapper;
import com.njcn.access.pojo.param.DeviceStatusParam;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.CsLogsFeignClient;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -35,8 +36,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
private final MqttUtil mqttUtil;
private final CsLogsFeignClient csLogsFeignClient;
@Override
public void updateStatusBynDid(String nDid,Integer status,String engineeringId, String projectId) {
LambdaUpdateWrapper<CsEquipmentDeliveryPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
@@ -110,13 +109,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (mqttClient) {
result.add(item);
} else {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线");
csLogsFeignClient.addUserLog(logDto);
}
});
}
@@ -147,13 +139,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (mqttClient) {
result.add(item);
} else {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
logDto.setResult(1);
logDto.setOperate(item.getNdid() + "接入失败,装置客户端不在线");
csLogsFeignClient.addUserLog(logDto);
}
});
}

View File

@@ -8,8 +8,10 @@ import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsHeartService;
import com.njcn.access.service.IHeartbeatService;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.api.CsCommunicateFeignClient;
import com.njcn.csdevice.api.CsLedgerFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.param.DeviceMessageParam;
import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
@@ -17,8 +19,6 @@ import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.mq.message.HeartbeatTimeoutMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -45,20 +45,12 @@ public class CsHeartServiceImpl implements ICsHeartService {
@Resource
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Resource
private CsLogsFeignClient csLogsFeignClient;
@Resource
private EquipmentFeignClient equipmentFeignClient;
@Resource
private SendMessageUtil sendMessageUtil;
@Resource
private CsLedgerFeignClient csLedgerFeignclient;
@Resource
private AppUserFeignClient appUserFeignClient;
@Resource
private CsDeviceUserFeignClient csDeviceUserFeignClient;
@Resource
private UserFeignClient userFeignClient;
@Resource
private IHeartbeatService heartbeatService;
@Resource
private CsCommunicateFeignClient csCommunicateFeignClient;
@@ -79,14 +71,10 @@ public class CsHeartServiceImpl implements ICsHeartService {
}
private void handleDeviceOffline(String nDid) {
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName("运维管理员");
logDto.setLoginName("njcnyw");
//装置下线
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
//装置调整为注册状态
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null);
logDto.setOperate(nDid +"装置离线");
sendMessage(nDid);
//记录装置掉线时间
PqsCommunicateDto dto = new PqsCommunicateDto();
@@ -95,7 +83,6 @@ public class CsHeartServiceImpl implements ICsHeartService {
dto.setType(0);
dto.setDescription("通讯中断");
csCommunicateFeignClient.insertion(dto);
csLogsFeignClient.addUserLog(logDto);
//清空缓存
redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid);
}
@@ -106,7 +93,6 @@ public class CsHeartServiceImpl implements ICsHeartService {
NoticeUserDto dto = sendOffLine(nDid);
if (CollectionUtil.isNotEmpty(dto.getPushClientId())) {
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
}
}
}
@@ -135,12 +121,4 @@ public class CsHeartServiceImpl implements ICsHeartService {
}
return dto;
}
private void addLogs(NoticeUserDto noticeUserDto) {
DeviceLogDTO dto = new DeviceLogDTO();
dto.setUserName("运维管理员");
dto.setLoginName("njcnyw");
dto.setOperate(noticeUserDto.getContent());
csLogsFeignClient.addUserLog(dto);
}
}

View File

@@ -8,7 +8,10 @@ import com.njcn.access.pojo.po.CsTopic;
import com.njcn.access.service.ICsTopicService;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* <p>
@@ -42,6 +45,17 @@ public class CsTopicServiceImpl extends ServiceImpl<CsTopicMapper, CsTopic> impl
return version;
}
@Override
public Map<String, String> getVersion(List<String> list) {
Map<String, String> map = new HashMap<>();
List<CsTopic> topicList = this.lambdaQuery().in(CsTopic::getNDid,list).isNotNull(CsTopic::getVersion).list();
Map<String, List<CsTopic>> topicMap = topicList.stream().collect(Collectors.groupingBy(CsTopic::getNDid));
topicMap.forEach((key,value)->{
map.put(key,value.get(0).getVersion());
});
return map;
}
@Override
public void deleteByNDid(String nDid) {
LambdaQueryWrapper<CsTopic> lambdaQueryWrapper = new LambdaQueryWrapper<>();

View File

@@ -20,6 +20,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>cs-system-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-web</artifactId>

View File

@@ -0,0 +1,138 @@
package com.njcn.message.consumer;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.cssystem.api.CsLogsFeignClient;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.LogMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:32
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.ZL_LOG_TOPIC,
consumerGroup = BusinessTopic.ZL_LOG_TOPIC,
selectorExpression = BusinessTopic.LogTag.LOG_TAG,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class LogConsumer extends EnhanceConsumerMessageHandler<LogMessage> implements RocketMQListener<LogMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Resource
private CsLogsFeignClient csLogsFeignClient;
@Override
protected void handleMessage(LogMessage logMessage) {
DeviceLogDTO deviceLogDTO = new DeviceLogDTO();
BeanUtils.copyProperties(logMessage,deviceLogDTO);
csLogsFeignClient.addUserLog(deviceLogDTO);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(LogMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis 5分钟避免重复消费
*/
@Override
protected void consumeSuccess(LogMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, 5 * 60L);
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(LogMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(LogMessage appAutoDataMessage) {
super.dispatchMessage(appAutoDataMessage);
}
}