设备自动连接逻辑优化

This commit is contained in:
xy
2026-04-08 10:18:07 +08:00
parent b3d2727a64
commit c2b48d6830
9 changed files with 214 additions and 9 deletions

View File

@@ -0,0 +1,20 @@
package com.njcn.access.api;
import com.njcn.access.api.fallback.CsDeviceClientFallbackFactory;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
* @author xy
*/
@FeignClient(value = ServerInfo.ACCESS_BOOT, path = "/device", fallbackFactory = CsDeviceClientFallbackFactory.class,contextId = "device")
public interface CsDeviceFeignClient {
@PostMapping("/updateRunStatus")
HttpResult<String> updateRunStatus(@RequestParam("nDid") String nDid, @RequestParam("runStatus") Integer runStatus);
}

View File

@@ -0,0 +1,34 @@
package com.njcn.access.api.fallback;
import com.njcn.access.api.CsDeviceFeignClient;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author xy
*/
@Slf4j
@Component
public class CsDeviceClientFallbackFactory implements FallbackFactory<CsDeviceFeignClient> {
@Override
public CsDeviceFeignClient create(Throwable cause) {
//判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if (cause.getCause() instanceof BusinessException) {
BusinessException businessException = (BusinessException) cause.getCause();
}
Enum<?> finalExceptionEnum = exceptionEnum;
return new CsDeviceFeignClient() {
@Override
public HttpResult<String> updateRunStatus(String nDid, Integer runStatus) {
log.error("{}异常,降级处理,异常为:{}","云设备状态调整异常",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -30,7 +30,7 @@ public enum AccessResponseEnum {
DEV_MODEL_NOT_FIND("A0303","装置型号未找到!"), DEV_MODEL_NOT_FIND("A0303","装置型号未找到!"),
DEV_IS_NOT_ZL("A0303","注册装置不是直连装置!"), DEV_IS_NOT_ZL("A0303","注册装置不是直连装置!"),
DEV_IS_NOT_WG("A0303","注册装置不是网关!"), DEV_IS_NOT_WG("A0303","注册装置不是网关!"),
DEV_IS_NOT_PORTABLE("A0303","注册装置不是便携式装置!"), DEV_IS_NOT_PORTABLE("A0303","注册装置不是便携式装置或者在线监测设备!"),
REGISTER_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"), REGISTER_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"),
ACCESS_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"), ACCESS_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"),

View File

@@ -2,6 +2,7 @@ package com.njcn.access.controller;
import com.njcn.access.param.DevAccessParam; import com.njcn.access.param.DevAccessParam;
import com.njcn.access.service.ICsDeviceService; import com.njcn.access.service.ICsDeviceService;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum;
@@ -34,6 +35,7 @@ import org.springframework.web.bind.annotation.*;
public class CsDeviceController extends BaseController { public class CsDeviceController extends BaseController {
private final ICsDeviceService csDeviceService; private final ICsDeviceService csDeviceService;
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON) @OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/register") @PostMapping("/register")
@@ -130,4 +132,32 @@ public class CsDeviceController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "success", methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "success", methodDescribe);
} }
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/updateRunStatus")
@ApiOperation("设备状态调整")
@ApiImplicitParams({
@ApiImplicitParam(name = "nDid", value = "设备识别码", required = true),
@ApiImplicitParam(name = "runStatus", value = "状态", required = true)
})
public HttpResult<String> updateRunStatus(@RequestParam String nDid, @RequestParam Integer runStatus){
String methodDescribe = getMethodDescribe("updateRunStatus");
csEquipmentDeliveryService.updateRunStatusBynDid(nDid,runStatus);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "success", methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/onlineRegister")
@ApiOperation("监测设备接入")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectId", value = "项目id", required = true),
@ApiImplicitParam(name = "nDid", value = "设备识别码", required = true)
})
@ReturnMsg
public HttpResult<String> onlineRegister(@RequestParam("projectId") String projectId,@RequestParam("nDid") String nDid){
String methodDescribe = getMethodDescribe("onlineRegister");
String result = csDeviceService.onlineRegister(projectId,nDid);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
} }

View File

@@ -32,7 +32,7 @@ import java.util.List;
@Slf4j @Slf4j
@RestController @RestController
@RequestMapping("/csLineLatestData") @RequestMapping("/csLineLatestData")
@Api(tags = "暂降事件") @Api(tags = "治理设备模块运行状态记录")
@AllArgsConstructor @AllArgsConstructor
public class CsLineLatestDataController extends BaseController { public class CsLineLatestDataController extends BaseController {

View File

@@ -56,4 +56,6 @@ public interface ICsDeviceService {
void wlAccess(String nDid); void wlAccess(String nDid);
String autoPortableLedger(); String autoPortableLedger();
String onlineRegister(String projectId,String nDid);
} }

View File

@@ -531,6 +531,113 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
return csProjectPO.getId(); return csProjectPO.getId();
} }
@Override
@Transactional(rollbackFor = Exception.class)
public String onlineRegister(String projectId,String nDid) {
String result = "fail";
// 根据模板接入设备
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("监测设备"+nDid+"注册、接入");
logDto.setResult(1);
try {
// 设备状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
Thread.sleep(2000);
// MQTT询问装置用的模板并判断库中是否存在模板
checkDeviceModel(nDid);
Thread.sleep(2000);
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
List<CsLinePO> csLinePoList = new ArrayList<>();
//1.录入装置台账信息
//新增便携式设备
CsLedgerParam csLedgerParam = new CsLedgerParam();
csLedgerParam.setId(vo.getId());
csLedgerParam.setPid(projectId);
csLedgerParam.setName(vo.getName());
csLedgerParam.setLevel(2);
csLedgerParam.setSort(0);
csLedgerService.addLedgerTree(csLedgerParam);
//2.根据模板获取监测点个数,插入监测点表
Thread.sleep(2000);
List<CsModelDto> modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class);
if (CollUtil.isEmpty(modelList)) {
throwExceptionAndLog(nDid,AccessResponseEnum.MODEL_ERROR, logDto);
}
List<CsDataSet> list = csDataSetService.getDataSetData(modelList.get(0).getModelId());
list.forEach(item->{
CsLinePO po = new CsLinePO();
po.setLineId(nDid + item.getClDev().toString());
po.setName(item.getClDev().toString() + "#监测点");
po.setStatus(1);
po.setClDid(item.getClDev());
po.setLineNo(item.getClDev());
po.setRunStatus(0);
po.setDeviceId(vo.getId());
po.setDataSetId(item.getId());
po.setDataModelId(item.getPid());
//防止主键重复
QueryWrapper<CsLinePO> qw = new QueryWrapper<>();
qw.eq("line_id",po.getLineId());
if(csLineService.getBaseMapper().selectList(qw).isEmpty()){
csLinePoList.add(po);
}
//3.生成台账树监测点数据
CsLedgerParam param = new CsLedgerParam();
param.setId(nDid + item.getClDev().toString());
param.setPid(vo.getId());
param.setName(item.getClDev().toString() + "#监测点");
param.setLevel(3);
param.setSort(0);
csLedgerService.addLedgerTree(param);
});
csLineService.saveBatch(csLinePoList);
redisUtil.saveByKeyWithExpire("accessLineInfo:" + nDid,csLinePoList,30L);
//4.生成装置和模板的关系表
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
csDevModelRelationAddParm.setDevId(vo.getId());
csDevModelRelationAddParm.setModelId(modelList.get(0).getModelId());
csDevModelRelationAddParm.setDid(modelList.get(0).getDid());
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
//5.绑定装置和人的关系
CsDeviceUserPO po = new CsDeviceUserPO();
po.setPrimaryUserId(RequestUtil.getUserIndex());
po.setStatus("1");
po.setSubUserId(RequestUtil.getUserIndex());
po.setDeviceId(vo.getId());
csDeviceUserService.saveBatch(Collections.singletonList(po));
//发起自动接入请求
Thread.sleep(2000);
//先获取版本
String version = "V1";
devAccessAskTemplate(nDid,version,1);
//6.修改流程,接入成功即为实际环境
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);
//7.存储日志
csLogsFeignClient.addUserLog(logDto);
//9.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.delete(AppRedisKey.LINE + nDid);
//判断接入状态
Thread.sleep(5000);
Object object = redisUtil.getObjectByKey("online" + nDid);
if (Objects.nonNull(object)) {
result = "success";
}
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
csLogsFeignClient.addUserLog(logDto);
resetFactory(nDid);
throw new BusinessException(e.getMessage());
}
return result;
}
private void checkDeviceStatus(String nDid) { private void checkDeviceStatus(String nDid) {
DeviceLogDTO logDto = createLogDto("当前设备"+nDid+"状态判断"); DeviceLogDTO logDto = createLogDto("当前设备"+nDid+"状态判断");
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid); CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);
@@ -542,7 +649,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_NOT_FIND, logDto); throwExceptionAndLog(nDid,AccessResponseEnum.DEV_NOT_FIND, logDto);
} }
String code = sysDicTreePo.getCode(); String code = sysDicTreePo.getCode();
if (!Objects.equals(code, DicDataEnum.PORTABLE.getCode())) { if (!Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && !Objects.equals(code, DicDataEnum.DEV_CLD.getCode())) {
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_IS_NOT_PORTABLE, logDto); throwExceptionAndLog(nDid,AccessResponseEnum.DEV_IS_NOT_PORTABLE, logDto);
} }
if (!mqttUtil.judgeClientOnline("NJCN-" + nDid.substring(nDid.length() - 6))) { if (!mqttUtil.judgeClientOnline("NJCN-" + nDid.substring(nDid.length() - 6))) {

View File

@@ -2,7 +2,9 @@ package com.njcn.stat.service.impl;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.api.CsDeviceFeignClient;
import com.njcn.access.api.CsLineLatestDataFeignClient; import com.njcn.access.api.CsLineLatestDataFeignClient;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.po.CsLineLatestData; import com.njcn.access.pojo.po.CsLineLatestData;
import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
@@ -57,6 +59,7 @@ public class StatServiceImpl implements IStatService {
private final RedisUtil redisUtil; private final RedisUtil redisUtil;
private final ChannelObjectUtil channelObjectUtil; private final ChannelObjectUtil channelObjectUtil;
private final CsLineLatestDataFeignClient csLineLatestDataFeignClient; private final CsLineLatestDataFeignClient csLineLatestDataFeignClient;
private final CsDeviceFeignClient csDeviceFeignClient;
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{ private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
put("AB", "A"); put("AB", "A");
put("BC", "B"); put("BC", "B");
@@ -106,7 +109,6 @@ public class StatServiceImpl implements IStatService {
//云前置设备 //云前置设备
else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) { else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) {
lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid(); lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
} }
//获取当前设备信息 //获取当前设备信息
@@ -133,7 +135,8 @@ public class StatServiceImpl implements IStatService {
default: default:
break; break;
} }
int clDid = Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)?1:appAutoDataMessage.getMsg().getClDid(); boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
int clDid = flag?1:appAutoDataMessage.getMsg().getClDid();
String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getStatMethod() + dataArrayParam.getIdx()); String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getStatMethod() + dataArrayParam.getIdx());
Object object = redisUtil.getObjectByKey(key); Object object = redisUtil.getObjectByKey(key);
List<CsDataArray> dataArrayList; List<CsDataArray> dataArrayList;
@@ -142,10 +145,11 @@ public class StatServiceImpl implements IStatService {
} else { } else {
dataArrayList = objectToList(object); dataArrayList = objectToList(object);
} }
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code); List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code,po.getDevAccessMethod());
recordList.addAll(result); recordList.addAll(result);
//获取时间 //获取时间
long devTime = Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)?item.getDataTimeSec():item.getDataTimeSec()-8*3600; boolean timeFlag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
long devTime = timeFlag?item.getDataTimeSec():item.getDataTimeSec()-8*3600;
time = Instant.ofEpochSecond(devTime) time = Instant.ofEpochSecond(devTime)
.atZone(ZoneId.systemDefault()) .atZone(ZoneId.systemDefault())
.toLocalDateTime(); .toLocalDateTime();
@@ -159,6 +163,10 @@ public class StatServiceImpl implements IStatService {
csLineLatestData.setTimeId(Objects.isNull(time) ? LocalDateTime.now() : time); csLineLatestData.setTimeId(Objects.isNull(time) ? LocalDateTime.now() : time);
csLineLatestDataFeignClient.addData(csLineLatestData); csLineLatestDataFeignClient.addData(csLineLatestData);
} }
//判断设备运行状态
if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) {
csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode());
}
} }
System.gc(); System.gc();
} }
@@ -205,7 +213,7 @@ public class StatServiceImpl implements IStatService {
/** /**
* influxDB数据组装 * influxDB数据组装
*/ */
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType) { public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) {
List<String> records = new ArrayList<String>(); List<String> records = new ArrayList<String>();
//解码 //解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData())); List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
@@ -233,7 +241,8 @@ public class StatServiceImpl implements IStatService {
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i)); fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag()); fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。 //fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, Objects.equals(DicDataEnum.DEV_CLD.getCode(),devType)?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
Point point = influxDbUtils.pointBuilder(tableName, flag?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point); batchPoints.point(point);
records.add(batchPoints.lineProtocol()); records.add(batchPoints.lineProtocol());

View File

@@ -260,6 +260,9 @@ public class SendEventUtils {
List<String> list = csDeviceUserFeignClient.findUserById(devId).getData(); List<String> list = csDeviceUserFeignClient.findUserById(devId).getData();
adminList.addAll(list); adminList.addAll(list);
} }
if (CollectionUtil.isNotEmpty(adminList)) {
adminList = adminList.stream().distinct().collect(Collectors.toList());
}
return adminList; return adminList;
} }