feat(access): 优化设备接入流程并增加监测点管理功能

- 新增DEV_DATA_ERROR响应枚举处理装置端监测点信息获取失败情况
- 集成CsLineFeignClient服务实现线路信息精确获取,替代原有字符解析方式
- 删除无用的LogMessageTemplate依赖和objectToMap工具方法
- 添加CsDeviceRegistryFeignClient支持设备注册表管理
- 重构CsDeviceServiceImpl中的设备接入逻辑,优化监测点ID分配机制
- 新增accessByUpdateMac接口支持设备MAC地址变更后的重新接入
- 完善设备监测点信息获取流程,增加数据去重和异常处理机制
- 添加稳态事件指标配置功能,集成谐波方案管理
- 优化MQTT消息处理器中的设备信息更新逻辑
- 增强设备软件信息解析的日期格式兼容性
- 优化设备容量和台账信息更新的数据持久化流程
This commit is contained in:
xy
2026-06-22 13:49:11 +08:00
parent 7ad8f5f80c
commit ad1e051a94
12 changed files with 438 additions and 142 deletions

View File

@@ -75,6 +75,7 @@ public enum AccessResponseEnum {
FILE_CHECK_ERROR("A0312","文件校验码不一致!"),
CLD_MODEL_EXIST("A0313","云前置模板已存在,请先删除再录入!"),
DEV_DATA_ERROR("A0313","系统端询问装置端监测点信息失败,无法生成监测点!"),
/**
* A3001 ~ A3099 用于zlevent模块的枚举

View File

@@ -5,7 +5,8 @@ import com.njcn.redis.utils.RedisUtil;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author xy
@@ -51,22 +52,4 @@ public class ChannelObjectUtil {
public Object getDeviceMid(String nDid) {
return redisUtil.getObjectByKey(AppRedisKey.DEVICE_MID + nDid);
}
public Map<String, List<String>> objectToMap(Object obj) {
// 创建并填充 Map
Map<String, List<String>> resultMap = new HashMap<>();
String json = obj.toString();
// 移除首尾的 {}
json = json.substring(1, json.length() - 1);
// 找到键和值的分隔符位置
int keyEndIndex = json.indexOf("=[");
String key = json.substring(0, keyEndIndex);
String valuesStr = json.substring(keyEndIndex + 2, json.length() - 1);
// 将值字符串分割成列表
String[] valuesArray = valuesStr.split(", ");
List<String> valuesList = Arrays.asList(valuesArray);
resultMap.put(key, valuesList);
return resultMap;
}
}

View File

@@ -159,5 +159,15 @@ public class CsDeviceController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/accessByUpdateMac")
@ApiOperation("修改mac后手动接入")
@ApiImplicitParam(name = "nDid", value = "设备识别码", required = true)
public HttpResult<String> accessByUpdateMac(@RequestParam("nDid") String nDid){
String methodDescribe = getMethodDescribe("accessByUpdateMac");
String result = csDeviceService.accessByUpdateMac(nDid);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -99,6 +99,7 @@ public class MqttMessageHandler {
private final CsCommunicateFeignClient csCommunicateFeignClient;
private final IHeartbeatService heartbeatService;
private final LogMessageTemplate logMessageTemplate;
private final CsDeviceRegistryFeignClient csDeviceRegistryFeignClient;
@Autowired
Validator validator;
@@ -136,6 +137,8 @@ public class MqttMessageHandler {
list.add(csTopic);
});
csTopicService.addTopic(nDid,list);
String version = list.stream().map(CsTopic::getVersion).filter(Objects::nonNull).findFirst().orElse("V1");
redisUtil.saveByKeyWithExpire(nDid +":version",version,30L);
logMessageTemplate.sendMember(logDto);
} else {
logDto.setResult(0);
@@ -223,7 +226,7 @@ public class MqttMessageHandler {
//业务处理
Gson gson = new Gson();
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
redisUtil.saveByKeyWithExpire("devResponse",res.getCode(),5L);
//redisUtil.saveByKeyWithExpire("devResponse",res.getCode(),5L);
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())) {
switch (res.getType()){
/**
@@ -247,6 +250,7 @@ public class MqttMessageHandler {
logMessageTemplate.sendMember(logDto);
//有异常删除缓存的模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
log.error("{}{}", nDid, AccessResponseEnum.MODEL_VERSION_ERROR.getMessage());
throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR);
}
//校验前置传递的装置模板库中是否存在
@@ -291,7 +295,7 @@ public class MqttMessageHandler {
});
//存储模板id
String key2 = AppRedisKey.MODEL + nDid;
redisUtil.saveByKeyWithExpire(key2,modelList,600L);
redisUtil.saveByKey(key2,modelList);
//存储监测点模板信息,用于界面回显
List<String> modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList());
List<CsLineModel> lineList = csLineModelService.getMonitorNumByModelId(modelId);
@@ -353,11 +357,14 @@ public class MqttMessageHandler {
String id = IdUtil.fastSimpleUUID();
csSoftInfoPo.setId(id);
DateTimeFormatter formatter = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd[[HH][:mm][:ss]]")
// 优先尝试紧凑格式
.optionalStart().appendPattern("yyyyMMdd").optionalEnd()
// 再尝试带横线格式
.optionalStart().appendPattern("yyyy-MM-dd").optionalEnd()
// 默认时间部分
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.parseDefaulting(ChronoField.MILLI_OF_SECOND, 0)
.toFormatter();
LocalDateTime localDateTime = LocalDateTime.parse(softInfo.getAppDate(), formatter);
assertThat(localDateTime).isNotNull();
@@ -376,12 +383,13 @@ public class MqttMessageHandler {
List<RspDataDto.LdevInfo> devInfo = JSON.parseArray(JSON.toJSONString(rspDataDto.getDataArray()), RspDataDto.LdevInfo.class);
if (CollectionUtil.isNotEmpty(devInfo)){
if (Objects.equals(res.getDid(),1)){
logDto.setOperate("系统端收到装置端"+nDid+"更新APF容量报文code = " + res.getCode());
logDto.setResult(1);
redisUtil.saveByKeyWithExpire("lineInfo:"+nDid,devInfo,30L);
List<CsDevCapacityPO> list3 = new ArrayList<>();
boolean hasZeroClDid = devInfo.stream().anyMatch(item -> item.getClDid() == 0);
//治理设备
if (hasZeroClDid) {
logDto.setOperate("系统端收到装置端"+nDid+"更新APF容量报文code = " + res.getCode());
logDto.setResult(1);
devInfo.forEach(item->{
if (Objects.equals(item.getClDid(),0)){
updateLineInfo(nDid,item);
@@ -396,6 +404,8 @@ public class MqttMessageHandler {
}
//其余设备
else {
logDto.setOperate("系统端收到装置端"+nDid+"更新监测点台账报文code = " + res.getCode());
logDto.setResult(1);
devInfo.forEach(item->{
updateLineInfo(nDid,item);
});
@@ -464,8 +474,15 @@ public class MqttMessageHandler {
}
public void updateLineInfo(String nDid,RspDataDto.LdevInfo item) {
CsDeviceRegistry csDeviceRegistry = csDeviceRegistryFeignClient.queryByCurrentNdidAndClDid(nDid, item.getClDid()).getData();
String lineId;
if (Objects.isNull(csDeviceRegistry)) {
lineId = nDid + item.getClDid();
} else {
lineId = csDeviceRegistry.getId();
}
CsLineParam csLineParam = new CsLineParam();
csLineParam.setLineId(nDid.concat(item.getClDid().toString()));
csLineParam.setLineId(lineId);
csLineParam.setVolGrade(item.getVolGrade());
csLineParam.setPtRatio(item.getPtRatio());
csLineParam.setCtRatio(item.getCtRatio());
@@ -474,8 +491,8 @@ public class MqttMessageHandler {
csLineFeignClient.updateLine(csLineParam);
//生成监测点限值
Overlimit overlimit = COverlimitUtil.globalAssemble(item.getVolGrade().floatValue(),10f,10f,10f,0,0);
overlimit.setId(nDid.concat(item.getClDid().toString()));
overLimitWlMapper.deleteById(nDid.concat(item.getClDid().toString()));
overlimit.setId(lineId);
overLimitWlMapper.deleteById(lineId);
overLimitWlMapper.insert(overlimit);
}
@@ -557,6 +574,7 @@ public class MqttMessageHandler {
JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto));
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class);
appAutoDataMessage.setId(nDid);
redisUtil.saveByKeyWithExpire("devResponse:" + nDid ,200,5L);
rtFeignClient.analysis(appAutoDataMessage);
break;
//处理主动上送的统计数据、电度数据
@@ -709,6 +727,9 @@ public class MqttMessageHandler {
* 3监测点pt/ct信息
*/
public void askDevData(String nDid,String version,Integer type,Integer mid) {
//获取文件模板信息
List<CsModelDto> modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class);
for (CsModelDto model : modelList) {
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统");
logDto.setLoginName("系统");
@@ -725,26 +746,26 @@ public class MqttMessageHandler {
askDataDto.setEndTime(-1);
switch (type) {
case 1:
reqAndResParam.setDid(0);
reqAndResParam.setDid(model.getDid());
askDataDto.setCldid(0);
askDataDto.setDataType(1);
logDto.setOperate("系统端向装置端"+nDid+"询问软件信息");
break;
case 2:
reqAndResParam.setDid(1);
reqAndResParam.setDid(model.getDid());
askDataDto.setCldid(-1);
askDataDto.setDataType(2);
logDto.setOperate("系统端向装置端"+nDid+"询问逻辑设备1信息");
break;
case 3:
reqAndResParam.setDid(2);
reqAndResParam.setDid(model.getDid());
askDataDto.setCldid(-1);
askDataDto.setDataType(2);
logDto.setOperate("系统端向装置端"+nDid+"询问逻辑设备2信息");
break;
//询问工程信息
case 48:
reqAndResParam.setDid(1);
reqAndResParam.setDid(model.getDid());
askDataDto.setCldid(1);
askDataDto.setDataType(48);
logDto.setOperate("系统端向装置端"+nDid+"询问工程信息");
@@ -756,6 +777,7 @@ public class MqttMessageHandler {
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
logMessageTemplate.sendMember(logDto);
}
}
public String getEnum(Integer code) {
String result = null;

View File

@@ -6,7 +6,6 @@ import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.mq.template.LogMessageTemplate;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.enums.DicDataEnum;

View File

@@ -32,4 +32,6 @@ public interface ICsDataSetService extends IService<CsDataSet> {
*/
List<CsDataSet> getDataSetData(String modelId);
List<CsDataSet> getDataSetData2(String modelId);
}

View File

@@ -58,4 +58,11 @@ public interface ICsDeviceService {
String autoPortableLedger();
String onlineRegister(String projectId,String nDid);
/**
* 设备修改mac之后需要手动重新接入因为已经存在台账了只要询问主题、发起接入请求即可
* @param nDid
* @return
*/
String accessByUpdateMac(String nDid);
}

View File

@@ -10,7 +10,9 @@ import com.njcn.access.pojo.dto.ControlDto;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.service.AskDeviceDataService;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.mq.message.RealDataMessage;
import com.njcn.mq.template.RealDataMessageTemplate;
import com.njcn.redis.pojo.enums.AppRedisKey;
@@ -33,6 +35,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
private static final Logger log = LoggerFactory.getLogger(AskDeviceDataServiceImpl.class);
private final MqttPublisher publisher;
private final CsTopicFeignClient csTopicFeignClient;
private final CsLineFeignClient csLineFeignClient;
private final RedisUtil redisUtil;
private final RealDataMessageTemplate realDataMessageTemplate;
private static Integer mid = 1;
@@ -218,8 +221,8 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
public void askCldRealData(String devId, String lineId, String nodeId, Integer idx) {
RealDataMessage realDataMessage = new RealDataMessage();
realDataMessage.setDevSeries(devId);
int lastDigit = Character.getNumericValue(lineId.charAt(lineId.length() - 1));
realDataMessage.setLine(lastDigit);
CsLinePO po = csLineFeignClient.getById(lineId).getData();
realDataMessage.setLine(po.getLineNo());
realDataMessage.setRealData(true);
realDataMessage.setSoeData(true);
realDataMessage.setLimit(20);

View File

@@ -6,6 +6,7 @@ import com.njcn.access.service.ICsDataSetService;
import com.njcn.csdevice.pojo.po.CsDataSet;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
/**
@@ -37,4 +38,13 @@ public class CsDataSetServiceImpl extends ServiceImpl<CsDataSetMapper, CsDataSet
.and(item->item.eq(CsDataSet::getDataType,"Stat").or().isNull(CsDataSet::getDataType))
.list();
}
@Override
public List<CsDataSet> getDataSetData2(String modelId) {
return this.lambdaQuery()
.eq(CsDataSet::getPid, modelId)
.and(item->item.eq(CsDataSet::getDataType,"Stat").or().isNull(CsDataSet::getDataType))
.eq(CsDataSet::getStoreFlag,1)
.list();
}
}

View File

@@ -2,6 +2,7 @@ package com.njcn.access.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -10,7 +11,9 @@ import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.param.DevAccessParam;
import com.njcn.access.pojo.RspDataDto;
import com.njcn.access.pojo.dto.AccessDto;
import com.njcn.access.pojo.dto.AskDataDto;
import com.njcn.access.pojo.dto.CsModelDto;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.param.DeviceStatusParam;
@@ -25,7 +28,10 @@ import com.njcn.csdevice.param.LineInfoParam;
import com.njcn.csdevice.pojo.param.*;
import com.njcn.csdevice.pojo.po.*;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.csharmonic.api.CsHarmonicPlanFeignClient;
import com.njcn.csharmonic.api.CsHarmonicPlanLineFeignClient;
import com.njcn.csharmonic.param.CsHarmonicPlanLineParam;
import com.njcn.csharmonic.pojo.po.CsHarmonicPlan;
import com.njcn.mq.message.LogMessage;
import com.njcn.mq.template.LogMessageTemplate;
import com.njcn.redis.pojo.enums.AppRedisKey;
@@ -89,8 +95,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
private final AppProjectFeignClient appProjectFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private final CsHarmonicPlanLineFeignClient csHarmonicPlanLineFeignClient;
private final CsHarmonicPlanFeignClient csHarmonicPlanFeignClient;
private final LogMessageTemplate logMessageTemplate;
private final StringRedisTemplate stringRedisTemplate;
private final CsDeviceRegistryFeignClient csDeviceRegistryFeignClient;
@Override
@@ -183,9 +191,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
Object model = null;
try {
String key = AppRedisKey.LINE + nDid;
model = redisUtil.getObjectByKey(key);
for (int i = 0; i < 3 ; i++) {
Thread.sleep(1000);
model = redisUtil.getObjectByKey(key);
if (!Objects.isNull(model)){
break;
}
@@ -218,6 +226,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
String version = csTopicService.getVersion(devAccessParam.getNDid());
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(devAccessParam.getNDid()).getData();
List<CsLinePO> csLinePoList = new ArrayList<>();
List<CsDeviceRegistry> csDeviceRegistryList = new ArrayList<>();
List<AppLineTopologyDiagramPO> appLineTopologyDiagramPoList = new ArrayList<>();
//1.录入装置台账信息
CsLedgerParam csLedgerParam = new CsLedgerParam();
@@ -237,6 +246,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
}
//3.监测点表录入关系
List<CsDeviceRegistry> data = csDeviceRegistryFeignClient.queryByCurrentNdid(devAccessParam.getNDid()).getData();
Map<Integer, String> clDidToIdMap;
if (CollUtil.isNotEmpty(data)) {
clDidToIdMap = data.stream().collect(Collectors.toMap(CsDeviceRegistry::getClDid, CsDeviceRegistry::getId, (a, b) -> a));
} else {
clDidToIdMap = new HashMap<>();
}
for (DevAccessParam.LineParam item : devAccessParam.getList()) {
String location = dicDataFeignClient.getDicDataById(item.getPosition()).getData().getCode();
CsLinePO po = new CsLinePO();
@@ -246,38 +262,48 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
po.setDeviceId(vo.getId());
po.setPosition(item.getPosition());
po.setClDid(0);
String uuid = IdUtil.fastSimpleUUID();
if (Objects.equals(DicDataEnum.GRID_SIDE.getCode(),location)){
po.setLineId(devAccessParam.getNDid() + "1");
if (!Objects.isNull(clDidToIdMap.get(1))) {
uuid = clDidToIdMap.get(1);
}
po.setLineId(uuid);
String id = Objects.requireNonNull(modelId.stream().filter(it -> Objects.equals(it.getDid(), 2)).findFirst().orElse(null)).getModelId();
po.setDataModelId(id);
//获取模板下数据集
List<CsDataSet> dataSets = csDataSetService.getDataSetData(id);
String dataSetId = Objects.requireNonNull(dataSets.stream().filter(it -> Objects.equals(it.getClDev(), 1)&&Objects.equals(it.getType(), 2)).findFirst().orElse(null)).getId();
po.setDataSetId(dataSetId);
param.setId(devAccessParam.getNDid() + "1");
appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "1");
param.setId(uuid);
appLineTopologyDiagramPo.setLineId(uuid);
po.setClDid(1);
} else if (Objects.equals(DicDataEnum.LOAD_SIDE.getCode(),location)){
po.setLineId(devAccessParam.getNDid() + "2");
if (!Objects.isNull(clDidToIdMap.get(2))) {
uuid = clDidToIdMap.get(2);
}
po.setLineId(uuid);
String id = Objects.requireNonNull(modelId.stream().filter(it -> Objects.equals(it.getDid(), 2)).findFirst().orElse(null)).getModelId();
po.setDataModelId(id);
//获取模板下数据集
List<CsDataSet> dataSets = csDataSetService.getDataSetData(id);
String dataSetId = Objects.requireNonNull(dataSets.stream().filter(it -> Objects.equals(it.getClDev(), 2)&&Objects.equals(it.getType(), 2)).findFirst().orElse(null)).getId();
po.setDataSetId(dataSetId);
param.setId(devAccessParam.getNDid() + "2");
appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "2");
param.setId(uuid);
appLineTopologyDiagramPo.setLineId(uuid);
po.setClDid(2);
} else {
po.setLineId(devAccessParam.getNDid() + "0");
if (!Objects.isNull(clDidToIdMap.get(0))) {
uuid = clDidToIdMap.get(0);
}
po.setLineId(uuid);
String id = Objects.requireNonNull(modelId.stream().filter(it -> Objects.equals(it.getDid(), 1)).findFirst().orElse(null)).getModelId();
po.setDataModelId(id);
//获取模板下数据集
List<CsDataSet> dataSets = csDataSetService.getDataSetData(id);
String dataSetId = Objects.requireNonNull(dataSets.stream().filter(it -> Objects.equals(it.getClDev(), 0)&&Objects.equals(it.getType(), 0)).findFirst().orElse(null)).getId();
po.setDataSetId(dataSetId);
param.setId(devAccessParam.getNDid() + "0");
appLineTopologyDiagramPo.setLineId(devAccessParam.getNDid() + "0");
param.setId(uuid);
appLineTopologyDiagramPo.setLineId(uuid);
}
po.setStatus(1);
csLinePoList.add(po);
@@ -292,6 +318,16 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
appLineTopologyDiagramPo.setStatus("1");
appLineTopologyDiagramPo.setTarget(item.getTarget());
appLineTopologyDiagramPoList.add(appLineTopologyDiagramPo);
if (CollUtil.isEmpty(data)) {
//存储设备注册表数据用来后续保留监测点id
CsDeviceRegistry csDeviceRegistry = new CsDeviceRegistry();
csDeviceRegistry.setId(uuid);
csDeviceRegistry.setCurrentNdid(devAccessParam.getNDid());
csDeviceRegistry.setOldNdid(devAccessParam.getNDid());
csDeviceRegistry.setClDid(po.getClDid());
csDeviceRegistry.setFirstSeenTime(LocalDateTime.now());
csDeviceRegistryList.add(csDeviceRegistry);
}
}
List<String> position = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList());
List<String> lineList = position.stream().filter(e-> Collections.frequency(position,e) > 1).distinct().collect(Collectors.toList());
@@ -304,8 +340,18 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//删除监测点稳态指标告警的默认指标配置
List<String> lineIdList = csLinePoList.stream().map(CsLinePO::getLineId).collect(Collectors.toList());
csHarmonicPlanLineFeignClient.deleteByLineIds(lineIdList);
List<CsHarmonicPlan> planList = csHarmonicPlanFeignClient.getByName("通用方案").getData();
if (CollectionUtil.isNotEmpty(planList)) {
CsHarmonicPlan plan = planList.get(0);
CsHarmonicPlanLineParam param1 = new CsHarmonicPlanLineParam();
param1.setId(plan.getId());
param1.setLineIds(lineIdList);
csHarmonicPlanLineFeignClient.savePlanLines(param1);
}
csLineService.saveBatch(csLinePoList);
if (CollectionUtil.isNotEmpty(csDeviceRegistryList)) {
csDeviceRegistryFeignClient.add(csDeviceRegistryList);
}
redisUtil.saveByKeyWithExpire("accessLineInfo:" + devAccessParam.getNDid(),csLinePoList,30L);
//缓存监测点信息
LineInfoParam param = new LineInfoParam();
@@ -402,6 +448,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
appLineTopologyDiagramPOQueryWrapper.in("line_id",collect);
appLineTopologyDiagramService.remove(appLineTopologyDiagramPOQueryWrapper);
}
//清空主题
csTopicService.deleteByNDid(nDid);
redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid);
}
@@ -453,9 +501,22 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
throwExceptionAndLog(nDid,AccessResponseEnum.MODEL_ERROR, logDto);
}
List<CsDataSet> list = csDataSetService.getDataSetData(modelList.get(0).getModelId());
List<CsDeviceRegistry> csDeviceRegistryList = new ArrayList<>();
List<CsDeviceRegistry> data = csDeviceRegistryFeignClient.queryByCurrentNdid(nDid).getData();
Map<Integer, String> clDidToIdMap;
if (CollUtil.isNotEmpty(data)) {
clDidToIdMap = data.stream().collect(Collectors.toMap(CsDeviceRegistry::getClDid, CsDeviceRegistry::getId, (a, b) -> a));
} else {
clDidToIdMap = new HashMap<>();
}
list.forEach(item->{
String uuid = IdUtil.fastSimpleUUID();
if (!Objects.isNull(clDidToIdMap.get(item.getClDev()))) {
uuid = clDidToIdMap.get(item.getClDev());
}
CsLinePO po = new CsLinePO();
po.setLineId(nDid + item.getClDev().toString());
po.setLineId(uuid);
po.setName(item.getClDev().toString() + "#监测点");
po.setStatus(1);
po.setClDid(item.getClDev());
@@ -470,14 +531,28 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
//3.生成台账树监测点数据
CsLedgerParam param = new CsLedgerParam();
param.setId(nDid + item.getClDev().toString());
param.setId(uuid);
param.setPid(vo.getId());
param.setName(item.getClDev().toString() + "#监测点");
param.setLevel(3);
param.setSort(0);
csLedgerService.addLedgerTree(param);
//4.存储设备注册表数据用来后续保留监测点id
if (CollUtil.isEmpty(data)) {
//存储设备注册表数据用来后续保留监测点id
CsDeviceRegistry csDeviceRegistry = new CsDeviceRegistry();
csDeviceRegistry.setId(uuid);
csDeviceRegistry.setCurrentNdid(nDid);
csDeviceRegistry.setOldNdid(nDid);
csDeviceRegistry.setClDid(po.getClDid());
csDeviceRegistry.setFirstSeenTime(LocalDateTime.now());
csDeviceRegistryList.add(csDeviceRegistry);
}
});
csLineService.saveBatch(csLinePoList);
if (CollectionUtil.isNotEmpty(csDeviceRegistryList)) {
csDeviceRegistryFeignClient.add(csDeviceRegistryList);
}
redisUtil.saveByKeyWithExpire("accessLineInfo:" + nDid,csLinePoList,30L);
//缓存监测点信息
LineInfoParam param = new LineInfoParam();
@@ -499,10 +574,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
Thread.sleep(2000);
String version = "V1";
devAccessAskTemplate(nDid,version,1);
//判断接入状态
Object object = redisUtil.getObjectByKey("online" + nDid);
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
//判断接入状态
Object object = redisUtil.getObjectByKey("online" + nDid);
if (Objects.nonNull(object)) {
result = "success";
break;
@@ -575,11 +650,11 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
if (csLedger == null) {
throw new BusinessException("未找到装置绑定的项目,请重新绑定");
}
List<CsDeviceRegistry> csDeviceRegistryList = new ArrayList<>();
// 根据模板接入装置
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("监测装置"+nDid+"注册、接入");
logDto.setOperate("系统端向监测装置端"+nDid+"发送注册、接入请求");
logDto.setResult(1);
try {
@@ -602,39 +677,111 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csLedgerParam.setLevel(2);
csLedgerParam.setSort(0);
csLedgerService.addLedgerTree(csLedgerParam);
//2.根据模板获取监测点个数,插入监测点表
Thread.sleep(2000);
List<CsModelDto> modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class);
//2.获取模板数据
List<CsModelDto> modelList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Thread.sleep(1000);
modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class);
if (CollUtil.isNotEmpty(modelList)) {
break;
}
}
if (CollUtil.isEmpty(modelList)) {
throwExceptionAndLog(nDid,AccessResponseEnum.MODEL_ERROR, logDto);
}
List<CsDataSet> list = csDataSetService.getDataSetData(modelList.get(0).getModelId());
list.forEach(item->{
List<CsDataSet> list = csDataSetService.getDataSetData2(modelList.get(0).getModelId());
Map<Integer, List<CsDataSet>> map = list.stream().collect(Collectors.groupingBy(CsDataSet::getConType));
//询问监测点个数
String version = redisUtil.getObjectByKey(nDid +":version").toString();
if (Objects.isNull(version)) {
version = "V1";
}
askDevData(nDid,version,2,2);
List<RspDataDto.LdevInfo> lDevInfos = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Thread.sleep(1000);
lDevInfos= channelObjectUtil.objectToList( redisUtil.getObjectByKey("lineInfo:"+nDid),RspDataDto.LdevInfo.class);
if (CollUtil.isNotEmpty(lDevInfos)) {
break;
}
}
if (CollUtil.isEmpty(lDevInfos)) {
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_DATA_ERROR, logDto);
}
//查询是否存在历史记录存储
List<CsDeviceRegistry> data = csDeviceRegistryFeignClient.queryByCurrentNdid(nDid).getData();
// 1. 预加载已存在的 lineId避免循环内查库
Set<String> existingLineIds = csLineService.getBaseMapper().selectList(null)
.stream()
.map(CsLinePO::getLineId)
.collect(Collectors.toSet());
// 2. 构建 clDid -> id 的映射(统一处理新增/已存在两种场景)
Map<Integer, String> clDidToIdMap;
if (CollUtil.isNotEmpty(data)) {
clDidToIdMap = data.stream().collect(Collectors.toMap(CsDeviceRegistry::getClDid, CsDeviceRegistry::getId, (a, b) -> a));
} else {
clDidToIdMap = new HashMap<>();
}
// 3. 提取公共默认 modelId
String defaultModelId = CollUtil.isNotEmpty(modelList) ? modelList.get(0).getModelId() : null;
// 4. 统一处理
for (RspDataDto.LdevInfo item : lDevInfos) {
Integer clDid = item.getClDid();
// 4.1 获取或生成 ID
String id = clDidToIdMap.computeIfAbsent(clDid, k -> {
String newId = IdUtil.fastSimpleUUID();
// 仅在不存在时创建注册表记录
CsDeviceRegistry registry = new CsDeviceRegistry();
registry.setId(newId);
registry.setCurrentNdid(nDid);
registry.setOldNdid(nDid);
registry.setClDid(k);
registry.setFirstSeenTime(LocalDateTime.now());
csDeviceRegistryList.add(registry);
return newId;
});
// 4.2 查找匹配的 DataSet带空值保护
int conTypeKey = Objects.equals(2, item.getConType()) ? 1 : item.getConType();
List<CsDataSet> dataSetList = map.getOrDefault(conTypeKey, Collections.emptyList());
CsDataSet dataSet = dataSetList.stream()
.filter(it -> Objects.equals(it.getClDev(), clDid))
.findFirst()
.orElseGet(() -> dataSetList.stream()
.filter(it -> Objects.equals(it.getClDev(), 1))
.findFirst()
.orElse(null));
if (dataSet == null) {
log.warn("未找到匹配的 DataSet, clDid={}, conType={}", clDid, item.getConType());
continue;
}
// 4.3 构建 CsLinePO去重判断改为内存 Set 查询)
if (!existingLineIds.contains(id)) {
CsLinePO po = new CsLinePO();
po.setLineId(nDid + item.getClDev().toString());
po.setName(item.getClDev().toString() + "#监测点");
po.setLineId(id);
po.setName(clDid + "#监测点");
po.setStatus(1);
po.setClDid(item.getClDev());
po.setLineNo(item.getClDev());
po.setClDid(clDid);
po.setLineNo(clDid);
po.setRunStatus(0);
po.setDeviceId(vo.getId());
po.setDataSetId(item.getId());
po.setDataModelId(item.getPid());
//防止主键重复
QueryWrapper<CsLinePO> qw = new QueryWrapper<>();
qw.eq("line_id",po.getLineId());
if(csLineService.getBaseMapper().selectList(qw).isEmpty()){
po.setDataSetId(dataSet.getId());
po.setDataModelId(defaultModelId);
csLinePoList.add(po);
existingLineIds.add(id);
}
//3.生成台账树监测点数据
// 4.4 生成台账树
CsLedgerParam param = new CsLedgerParam();
param.setId(nDid + item.getClDev().toString());
param.setId(id);
param.setPid(vo.getId());
param.setName(item.getClDev().toString() + "#监测点");
param.setName(clDid + "#监测点");
param.setLevel(3);
param.setSort(0);
param.setSort(clDid);
csLedgerService.addLedgerTree(param);
});
}
//存储监测点信息
csLinePoList.sort(Comparator.comparing(CsLinePO::getClDid));
csLineService.saveBatch(csLinePoList);
redisUtil.saveByKeyWithExpire("accessLineInfo:" + nDid,csLinePoList,30L);
//缓存监测点信息
@@ -642,6 +789,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
param.setNDid(nDid);
param.setList(csLinePoList);
deviceMessageFeignClient.getLineInfo(param);
//存储装置注册表数据
if (CollUtil.isNotEmpty(csDeviceRegistryList)) {
csDeviceRegistryFeignClient.add(csDeviceRegistryList);
}
//4.生成装置和模板的关系表
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
csDevModelRelationAddParm.setDevId(vo.getId());
@@ -660,21 +811,31 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//7.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.delete(AppRedisKey.LINE + nDid);
//8.新增稳态事件指标配置
List<String> lineIdList = csLinePoList.stream().map(CsLinePO::getLineId).collect(Collectors.toList());
csHarmonicPlanLineFeignClient.deleteByLineIds(lineIdList);
List<CsHarmonicPlan> planList = csHarmonicPlanFeignClient.getByName("通用方案").getData();
if (CollectionUtil.isNotEmpty(planList)) {
CsHarmonicPlan plan = planList.get(0);
CsHarmonicPlanLineParam param1 = new CsHarmonicPlanLineParam();
param1.setId(plan.getId());
param1.setLineIds(lineIdList);
csHarmonicPlanLineFeignClient.savePlanLines(param1);
}
//发起自动接入请求
Thread.sleep(2000);
//先获取版本
String version = "V1";
devAccessAskTemplate(nDid,version,1);
//判断接入状态
Object object = redisUtil.getObjectByKey("online" + nDid);
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
//判断接入状态
Object object = redisUtil.getObjectByKey("online" + nDid);
if (Objects.nonNull(object)) {
result = "success";
break;
}
}
//8.存储日志
//9.存储日志
logMessageTemplate.sendMember(logDto);
} catch (Exception e) {
logDto.setResult(0);
@@ -692,6 +853,103 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
return result;
}
@Override
public String accessByUpdateMac(String nDid) {
String result = "fail";
// 根据模板接入装置
LogMessage logDto = new LogMessage();
logDto.setUserIndex(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("装置修改mac系统端"+nDid+"发送接入请求");
logDto.setResult(1);
try {
// 装置状态判断
checkDeviceStatus(nDid);
// 询问装置支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
Thread.sleep(2000);
String version = redisUtil.getObjectByKey(nDid +":version").toString();
if (Objects.isNull(version)) {
version = "V1";
}
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(1,TypeEnum.TYPE_5.getCode())), 1, false);
//判断接入状态
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
Object object = redisUtil.getObjectByKey("online" + nDid);
if (Objects.nonNull(object)) {
result = "success";
//修改设备注册表的接入状态
csDeviceRegistryFeignClient.updateIsAccessByCurrentNdid(nDid,1);
break;
}
}
//9.存储日志
logMessageTemplate.sendMember(logDto);
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
logMessageTemplate.sendMember(logDto);
throw new BusinessException(e.getMessage());
}
return result;
}
/**
* type含义
* 1询问设备软件信息
* 2模块信息
* 3监测点pt/ct信息
*/
public void askDevData(String nDid,String version,Integer type,Integer mid){
LogMessage logDto = new LogMessage();
logDto.setUserIndex("系统");
logDto.setLoginName("系统");
logDto.setResult(1);
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_6.getCode()));
reqAndResParam.setExpire(-1);
AskDataDto askDataDto = new AskDataDto();
askDataDto.setDataAttr(0);
askDataDto.setOperate(1);
askDataDto.setStartTime(-1);
askDataDto.setEndTime(-1);
switch (type) {
case 1:
reqAndResParam.setDid(0);
askDataDto.setCldid(0);
askDataDto.setDataType(1);
logDto.setOperate("系统端向装置端"+nDid+"询问软件信息");
break;
case 2:
reqAndResParam.setDid(1);
askDataDto.setCldid(-1);
askDataDto.setDataType(2);
logDto.setOperate("系统端向装置端"+nDid+"询问逻辑设备1信息");
break;
case 3:
reqAndResParam.setDid(2);
askDataDto.setCldid(-1);
askDataDto.setDataType(2);
logDto.setOperate("系统端向装置端"+nDid+"询问逻辑设备2信息");
break;
//询问工程信息
case 48:
reqAndResParam.setDid(1);
askDataDto.setCldid(1);
askDataDto.setDataType(48);
logDto.setOperate("系统端向装置端"+nDid+"询问工程信息");
break;
default:
break;
}
reqAndResParam.setMsg(askDataDto);
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
logMessageTemplate.sendMember(logDto);
}
private void checkDeviceStatus(String nDid) {
LogMessage logDto = createLogDto("当前装置"+nDid+"状态判断");
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);

View File

@@ -8,14 +8,8 @@ import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.access.utils.RedisSetUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DataSetFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDataSet;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.pojo.po.*;
import com.njcn.mq.message.AppAutoDataMessage;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.enums.RtResponseEnum;
@@ -53,12 +47,19 @@ public class RtServiceImpl implements IRtService {
private final MqttPublisher publisher;
private final RedisSetUtil redisSetUtil;
private final EquipmentFeignClient equipmentFeignClient;
private final CsDeviceRegistryFeignClient csDeviceRegistryFeignClient;
@Override
public void analysis(AppAutoDataMessage appAutoDataMessage) {
List<CsDataArray> dataArrayList;
//监测点id
String lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
String lineId;
CsDeviceRegistry csDeviceRegistry = csDeviceRegistryFeignClient.queryByCurrentNdidAndClDid(appAutoDataMessage.getId(),appAutoDataMessage.getMsg().getClDid()).getData();
if (Objects.isNull(csDeviceRegistry)) {
lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
} else {
lineId = csDeviceRegistry.getId();
}
redisUtil.delete("cldRtDataOverTime:"+lineId);
//获取监测点基础信息
CsLinePO po = csLineFeignClient.getById(lineId).getData();

View File

@@ -116,7 +116,7 @@ public class StatServiceImpl implements IStatService {
}
//云前置设备
else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) {
lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get(appAutoDataMessage.getMsg().getClDid().toString()).toString();
}
//获取当前设备信息
@@ -168,16 +168,16 @@ public class StatServiceImpl implements IStatService {
csLineLatestDataFeignClient.addData(csLineLatestData);
}
//判断设备运行状态
if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) {
csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode());
//记录设备上线
PqsCommunicateDto dto = new PqsCommunicateDto();
dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
dto.setDevId(appAutoDataMessage.getId());
dto.setType(1);
dto.setDescription("通讯正常");
csCommunicateFeignClient.insertion(dto);
}
// if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) {
// csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode());
// //记录设备上线
// PqsCommunicateDto dto = new PqsCommunicateDto();
// dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN)));
// dto.setDevId(appAutoDataMessage.getId());
// dto.setType(1);
// dto.setDescription("通讯正常");
// csCommunicateFeignClient.insertion(dto);
// }
}
}