装置接入流程调整

This commit is contained in:
xy
2024-08-16 13:00:22 +08:00
parent ab9215f13e
commit 90dc22021f
8 changed files with 195 additions and 133 deletions

View File

@@ -86,8 +86,12 @@ public class CsDeviceController extends BaseController {
@ApiImplicitParam(name = "nDid", value = "设备id", required = true)
public HttpResult<String> manualAccess(@RequestParam String nDid){
String methodDescribe = getMethodDescribe("manualAccess");
csDeviceService.manualAccess(nDid);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
boolean result = csDeviceService.manualAccess(nDid);
if (result) {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "成功", methodDescribe);
} else {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.FAIL, "失败", methodDescribe);
}
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)

View File

@@ -88,7 +88,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
try {
String version = csTopicService.getVersion(nDid);
//装置没有心跳,则立马发起接入请求
csDeviceService.devAccess(nDid,version);
csDeviceService.devAccessAskTemplate(nDid,version,1);
logDto.setOperate("装置掉线3分钟发送接入请求");
csLogsFeignClient.addUserLog(logDto);
Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
@@ -96,10 +96,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
throw new BusinessException(CommonResponseEnum.SUCCESS);
}
//心跳断连立马发起接入失败后1分钟再次发起请求请求3次
for (int i = 1; i < 4; i++) {
for (int i = 2; i < 5; i++) {
//接入再次失败,则定时发起接入请求
Thread.sleep(1000 * 60);
csDeviceService.devAccess(nDid,version);
csDeviceService.devAccessAskTemplate(nDid,version,i);
status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
break;
@@ -108,10 +108,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
csLogsFeignClient.addUserLog(logDto);
}
if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){
final int[] mid = {2};
final int[] mid = {5};
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
ScheduledFuture<?> runnableFuture = executor.scheduleAtFixedRate(() -> {
csDeviceService.devAccessMid(nDid,version, mid[0]);
csDeviceService.devAccessAskTemplate(nDid,version, mid[0]);
Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){
throw new BusinessException(CommonResponseEnum.SUCCESS);

View File

@@ -1,47 +1,47 @@
package com.njcn.access.runner;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/28 13:57
*/
@Component
@Slf4j
public class AccessApplicationRunner implements ApplicationRunner {
@Resource
private CsDeviceServiceImpl csDeviceService;
@Resource
private ICsTopicService csTopicService;
@Resource
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Override
public void run(ApplicationArguments args){
List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getAll();
list.forEach(item->{
String version = csTopicService.getVersion(item.getNdid());
if (!Objects.isNull(version)){
csDeviceService.devAccess(item.getNdid(),version);
}
});
}
}
//package com.njcn.access.runner;
//
//import com.njcn.access.service.ICsEquipmentDeliveryService;
//import com.njcn.access.service.ICsTopicService;
//import com.njcn.access.service.impl.CsDeviceServiceImpl;
//import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.boot.ApplicationArguments;
//import org.springframework.boot.ApplicationRunner;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.util.List;
//import java.util.Objects;
//
///**
// * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
// *
// * @author xuyang
// * @version 1.0.0
// * @createTime 2023/8/28 13:57
// */
//@Component
//@Slf4j
//public class AccessApplicationRunner implements ApplicationRunner {
//
// @Resource
// private CsDeviceServiceImpl csDeviceService;
//
// @Resource
// private ICsTopicService csTopicService;
//
// @Resource
// private ICsEquipmentDeliveryService csEquipmentDeliveryService;
//
// @Override
// public void run(ApplicationArguments args){
// List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getAll();
// list.forEach(item->{
// String version = csTopicService.getVersion(item.getNdid());
// if (!Objects.isNull(version)){
// csDeviceService.devAccess(item.getNdid(),version,1);
// }
// });
// }
//
//}

View File

@@ -1,50 +1,53 @@
package com.njcn.access.runner;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 类的介绍:防止设备掉线 系统未能调整,做一个定时任务,每天凌晨将所有设备重新接入
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/28 14:21
*/
@Component
@Slf4j
public class AccessScheduledTask {
@Resource
private CsDeviceServiceImpl csDeviceService;
@Resource
private ICsTopicService csTopicService;
@Resource
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
/**
* {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
*/
@Scheduled(cron = "0 0 0 * * ?")
public void executeTask() {
log.info("每日凌晨定时任务执行");
List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getAll();
list.forEach(item->{
String version = csTopicService.getVersion(item.getNdid());
if (!Objects.isNull(version)){
csDeviceService.devAccess(item.getNdid(),version);
}
});
}
}
//package com.njcn.access.runner;
//
//import cn.hutool.core.collection.CollUtil;
//import cn.hutool.core.collection.CollectionUtil;
//import com.njcn.access.service.ICsEquipmentDeliveryService;
//import com.njcn.access.service.ICsTopicService;
//import com.njcn.access.service.impl.CsDeviceServiceImpl;
//import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.util.List;
//import java.util.Objects;
//
///**
// * 类的介绍:防止设备掉线 系统未能调整,做一个定时任务,每天凌晨将所有设备重新接入
// *
// * @author xuyang
// * @version 1.0.0
// * @createTime 2023/8/28 14:21
// */
//@Component
//@Slf4j
//public class AccessScheduledTask {
//
// @Resource
// private CsDeviceServiceImpl csDeviceService;
//
// @Resource
// private ICsTopicService csTopicService;
//
// @Resource
// private ICsEquipmentDeliveryService csEquipmentDeliveryService;
//
// /**
// * {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
// */
// @Scheduled(cron = "0 0 0 * * ?")
// public void executeTask() {
// log.info("每日凌晨定时任务执行");
// List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getAll();
// if (CollUtil.isNotEmpty(list)) {
// for (int i = 0; i < list.size(); i++) {
// String version = csTopicService.getVersion(list.get(i).getNdid());
// if (!Objects.isNull(version)){
// csDeviceService.devAccess(list.get(i).getNdid(),version,i);
// }
// }
// }
// }
//}

View File

@@ -27,5 +27,10 @@ public interface ICsDevModelRelationService extends IService<CsDevModelRelationP
*/
CsDevModelRelationPO addDevModelRelation(CsDevModelRelationAddParm addParm);
/**
* 根据装置id删除模板信息
*/
void deleteByDeviceId(String deviceId);
}

View File

@@ -41,7 +41,7 @@ public interface ICsDeviceService {
* 手动发起接入
* @param nDid
*/
void manualAccess(String nDid);
boolean manualAccess(String nDid);
/**
* 直连设备注册

View File

@@ -1,5 +1,6 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsDevModelRelationMapper;
@@ -48,6 +49,14 @@ public class CsDevModelRelationServiceImpl extends ServiceImpl<CsDevModelRelatio
return csDevModelRelationPO;
}
@Override
public void deleteByDeviceId(String deviceId) {
LambdaQueryWrapper<CsDevModelRelationPO> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(CsDevModelRelationPO::getDevId, deviceId)
.eq (CsDevModelRelationPO::getStatus, 1);
this.baseMapper.delete(wrapper);
}
public List<CsDevModelRelationVO> queryDevModelRelation(CsDevModelRelationQueryParm queryParm) {
QueryWrapper<CsDevModelRelationPO> queryWrapper = new QueryWrapper<> ();
queryWrapper.eq (StringUtils.isNotBlank (queryParm.getId ()),"id",queryParm.getId ()).

View File

@@ -41,10 +41,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.*;
import java.util.stream.Collectors;
/**
@@ -288,7 +285,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//6.修改装置状态
csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode());
//7.发起自动接入请求
devAccess(devAccessParam.getNDid(),version);
devAccessAskTemplate(devAccessParam.getNDid(),version,1);
//8.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + devAccessParam.getNDid());
@@ -360,9 +357,10 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
@Override
public void manualAccess(String nDid) {
@Transactional(rollbackFor = Exception.class)
public boolean manualAccess(String nDid) {
String version = csTopicService.getVersion(nDid);
devAccess(nDid,version);
return devAccessAskTemplate(nDid,version,new Random().nextInt(10000));
}
@@ -423,7 +421,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//3.修改装置状态为注册状态
csEquipmentDeliveryService.updateStatusBynDid(nDid, AccessEnum.REGISTERED.getCode());
//4.发起自动接入请求
devAccess(nDid,version);
devAccessAskTemplate(nDid,version,1);
//5.存储日志
csLogsFeignClient.addUserLog(logDto);
//6.存储设备调试日志表
@@ -457,7 +455,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
//获取版本
String version = csTopicService.getVersion(nDid);
//发起接入请求
this.devAccess(nDid,version);
this.devAccessAskTemplate(nDid,version,1);
}
private void checkDeviceStatus(String nDid) {
@@ -511,27 +509,70 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
throw new BusinessException(responseEnum);
}
public void devAccess(String nDid,String version) {
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(1);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
reqAndResParam.setExpire(-1);
logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam));
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
/**
* 装置重新接入系统,需要校验所用的模板
* @param nDid
* @param version
*/
@Transactional(rollbackFor = Exception.class)
public boolean devAccessAskTemplate(String nDid,String version,Integer mid) {
boolean result = false;
try {
//询问装置当前所用模板
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_3.getCode()));
reqAndResParam.setExpire(-1);
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
//接收到模板,判断模板是否存在,替换模板,发起接入
Thread.sleep(2000);
List<CsModelDto> modelId = objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid));
if (CollUtil.isNotEmpty(modelId)) {
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
//删除之前的模板信息
csDevModelRelationService.deleteByDeviceId(vo.getId());
//重新录入装置和模板关系信息
for (CsModelDto item : modelId) {
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
csDevModelRelationAddParm.setDevId(vo.getId());
csDevModelRelationAddParm.setModelId(item.getModelId());
csDevModelRelationAddParm.setDid(item.getDid());
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
}
//发起接入
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
result = true;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
}
public void devAccessMid(String nDid,String version,Integer mid) {
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
reqAndResParam.setExpire(-1);
logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam));
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
}
// public void devAccess(String nDid,String version) {
// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
// reqAndResParam.setMid(1);
// reqAndResParam.setDid(0);
// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
// reqAndResParam.setExpire(-1);
// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam));
// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
// }
//
// public void devAccessMid(String nDid,String version,Integer mid) {
// ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
// reqAndResParam.setMid(mid);
// reqAndResParam.setDid(0);
// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
// reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
// reqAndResParam.setExpire(-1);
// logger.info("设备接入报文为:" + new Gson().toJson(reqAndResParam));
// publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false);
// }
/**
* 平台对设备发起主题询问命令