MQTT通讯功能联调
This commit is contained in:
@@ -9,9 +9,7 @@ import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.access.enums.AccessEnum;
|
||||
import com.njcn.access.enums.AccessResponseEnum;
|
||||
import com.njcn.access.enums.TypeEnum;
|
||||
import com.njcn.access.pojo.dto.DevModInfoDto;
|
||||
import com.njcn.access.pojo.dto.ModelDto;
|
||||
import com.njcn.access.pojo.dto.ReqAndResDto;
|
||||
import com.njcn.access.pojo.dto.*;
|
||||
import com.njcn.access.pojo.dto.heart.HeartBeatDto;
|
||||
import com.njcn.access.pojo.param.ReqAndResParam;
|
||||
import com.njcn.access.pojo.po.CsLineModel;
|
||||
@@ -21,6 +19,7 @@ import com.njcn.access.service.ICsTopicService;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
import com.njcn.csdevice.api.DevModelFeignClient;
|
||||
import com.njcn.csdevice.api.EquipmentFeignClient;
|
||||
import com.njcn.csdevice.pojo.po.CsDevModelPO;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.AllArgsConstructor;
|
||||
@@ -33,7 +32,9 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
import javax.validation.ConstraintViolation;
|
||||
import javax.validation.Validator;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
@@ -45,6 +46,8 @@ import java.util.*;
|
||||
@AllArgsConstructor
|
||||
public class MqttMessageHandler {
|
||||
|
||||
private final EquipmentFeignClient equipmentFeignClient;
|
||||
|
||||
private final DevModelFeignClient devModelFeignClient;
|
||||
|
||||
private final ICsLineModelService csLineModelService;
|
||||
@@ -58,7 +61,7 @@ public class MqttMessageHandler {
|
||||
@Autowired
|
||||
Validator validator;
|
||||
|
||||
@MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1)
|
||||
@MqttSubscribe(value = "/Dev/DevTopic/{edgeId}",qos = 1)
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
Gson gson = new Gson();
|
||||
@@ -69,16 +72,18 @@ public class MqttMessageHandler {
|
||||
System.out.println(constraintViolation.getMessage());
|
||||
});
|
||||
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){
|
||||
if (Objects.equals(res.getType(), Integer.parseInt(TypeEnum.TYPE_16.getCode()))){
|
||||
List<CsTopic> list = new ArrayList<>();
|
||||
//fixme 这边获取数据需要调整
|
||||
Map<String,List<String>> map = (Map<String,List<String>>)res.getMsg();
|
||||
List<String> topicList = map.get("Topic");
|
||||
topicList.forEach(item->{
|
||||
CsTopic csTopic = new CsTopic();
|
||||
csTopic.setNdid(nDid);
|
||||
csTopic.setNDid(nDid);
|
||||
csTopic.setTopic(item);
|
||||
csTopic.setType(0);
|
||||
String version = item.split("/")[3];
|
||||
if (version.startsWith("V")){
|
||||
csTopic.setVersion(version);
|
||||
}
|
||||
list.add(csTopic);
|
||||
});
|
||||
csTopicService.addTopic(nDid,list);
|
||||
@@ -105,15 +110,16 @@ public class MqttMessageHandler {
|
||||
Gson gson = new Gson();
|
||||
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
|
||||
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
if (Objects.equals(res.getType(),TypeEnum.TYPE_17.getCode())){
|
||||
if (Objects.equals(res.getType(),Integer.parseInt(TypeEnum.TYPE_17.getCode()))){
|
||||
//询问模板数据
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
reqAndResParam.setMid(1);
|
||||
reqAndResParam.setDid("0");
|
||||
reqAndResParam.setDid(0);
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(TypeEnum.TYPE_3.getCode());
|
||||
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_3.getCode()));
|
||||
reqAndResParam.setExpire(-1);
|
||||
publisher.send("/Pfm/DevCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false);
|
||||
String version = csTopicService.getVersion(nDid);
|
||||
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
|
||||
} else {
|
||||
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
|
||||
}
|
||||
@@ -138,27 +144,67 @@ public class MqttMessageHandler {
|
||||
Gson gson = new Gson();
|
||||
ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class);
|
||||
List<DevModInfoDto> list = modelDto.getMsg().getDevMod();
|
||||
List<DevCfgDto> list2 = modelDto.getMsg().getDevCfg();
|
||||
if (CollectionUtils.isEmpty(list)){
|
||||
log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage());
|
||||
throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR);
|
||||
}
|
||||
//校验前置传递的装置模板库中是否存在
|
||||
List<String> modelId = new ArrayList<>();
|
||||
List<CsModelDto> modelList = new ArrayList<>();
|
||||
list.forEach(item->{
|
||||
Integer did = null;
|
||||
for (DevCfgDto item2 : list2) {
|
||||
if (Objects.equals(item.getDevType(),item2.getDevType())){
|
||||
did = item2.getDid();
|
||||
}
|
||||
}
|
||||
CsModelDto csModelDto = new CsModelDto();
|
||||
CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData();
|
||||
if (Objects.isNull(po)){
|
||||
log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage());
|
||||
throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND);
|
||||
}
|
||||
modelId.add(po.getId());
|
||||
csModelDto.setDevType(po.getDevTypeName());
|
||||
csModelDto.setModelId(po.getId());
|
||||
csModelDto.setDid(did);
|
||||
modelList.add(csModelDto);
|
||||
});
|
||||
List<CsLineModel> lineList = csLineModelService.getMonitorNumByModelId(modelId);
|
||||
String key = "LINE" + nDid;
|
||||
//存储监测点模板信息,用于界面回显
|
||||
redisUtil.saveByKeyWithExpire(key,lineList,600L);
|
||||
//存储模板id
|
||||
String key2 = "MODEL" + nDid;
|
||||
redisUtil.saveByKeyWithExpire(key2,modelId,600L);
|
||||
redisUtil.saveByKeyWithExpire(key2,modelList,600L);
|
||||
//存储监测点模板信息,用于界面回显
|
||||
List<String> modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList());
|
||||
List<CsLineModel> lineList = csLineModelService.getMonitorNumByModelId(modelId);
|
||||
String key = "LINE" + nDid;
|
||||
redisUtil.saveByKeyWithExpire(key,lineList,600L);
|
||||
}
|
||||
|
||||
/**
|
||||
* 设备接入平台应答
|
||||
* @param topic
|
||||
* @param message
|
||||
* @param version
|
||||
* @param nDid
|
||||
* @param payload
|
||||
*/
|
||||
@MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1)
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void devAccessOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
log.info("收到接入应答响应--->" + nDid);
|
||||
Gson gson = new Gson();
|
||||
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
|
||||
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
if (Objects.equals(res.getType(),Integer.parseInt(TypeEnum.TYPE_20.getCode()))){
|
||||
//修改装置状态
|
||||
equipmentFeignClient.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode());
|
||||
//装置接入之后再设置心跳时间,超时改为掉线
|
||||
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L);
|
||||
} else {
|
||||
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -178,9 +224,9 @@ public class MqttMessageHandler {
|
||||
HeartBeatDto heartBeatDto = new HeartBeatDto();
|
||||
heartBeatDto.setTime(System.currentTimeMillis()/1000);
|
||||
reqAndResParam.setMid(1);
|
||||
reqAndResParam.setDid("0");
|
||||
reqAndResParam.setDid(0);
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(TypeEnum.TYPE_14.getCode());
|
||||
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_14.getCode()));
|
||||
reqAndResParam.setExpire(-1);
|
||||
reqAndResParam.setMsg(heartBeatDto);
|
||||
publisher.send("/Dev/PfmRsp/"+version+"/"+nDid,PubUtils.obj2json(reqAndResParam),1,false);
|
||||
|
||||
@@ -21,4 +21,12 @@ public interface ICsTopicService extends IService<CsTopic> {
|
||||
* @param list 主题集合
|
||||
*/
|
||||
void addTopic(String nDid,List<CsTopic> list);
|
||||
|
||||
/**
|
||||
* 获取设备的协议版本
|
||||
* @param nDid 网络设备码
|
||||
*/
|
||||
String getVersion(String nDid);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -765,6 +765,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
||||
if (Objects.isNull(eleEpdPqd)){
|
||||
throw new BusinessException(AccessResponseEnum.DICT_MISSING);
|
||||
}
|
||||
// M 代表没有数据,因为influxDB要录入数据,此字段是主键,给个默认值
|
||||
if (!Objects.isNull(eleEpdPqd.getHarmStart()) && !Objects.isNull(eleEpdPqd.getHarmEnd())){
|
||||
if (Objects.equals(eleEpdPqd.getHarmStart(),1)){
|
||||
for (int i = eleEpdPqd.getHarmStart(); i <= eleEpdPqd.getHarmEnd(); i++) {
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
package com.njcn.access.service.impl;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.TypeReference;
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.access.enums.AccessEnum;
|
||||
import com.njcn.access.enums.AccessResponseEnum;
|
||||
import com.njcn.access.enums.TypeEnum;
|
||||
import com.njcn.access.param.DevAccessParam;
|
||||
import com.njcn.access.pojo.dto.AccessDto;
|
||||
import com.njcn.access.pojo.dto.CsModelDto;
|
||||
import com.njcn.access.pojo.dto.ReqAndResDto;
|
||||
import com.njcn.access.service.ICsDeviceService;
|
||||
import com.njcn.access.service.ICsTopicService;
|
||||
import com.njcn.access.utils.MqttUtil;
|
||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
@@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -72,6 +72,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
|
||||
private final MqttUtil mqttUtil;
|
||||
|
||||
private final ICsTopicService csTopicService;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = {Exception.class})
|
||||
public void devRegister(String nDid) {
|
||||
@@ -90,12 +92,15 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
throw new BusinessException(AccessResponseEnum.DEV_IS_NOT_ZL);
|
||||
}
|
||||
//3.判断客户端是否在线
|
||||
//mqttUtil.judgeClientOnline(nDid);
|
||||
boolean mqttClient = mqttUtil.judgeClientOnline("access-boot1234567");
|
||||
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
|
||||
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
|
||||
if (!mqttClient){
|
||||
throw new BusinessException(AccessResponseEnum.MISSING_CLIENT);
|
||||
}
|
||||
//3.MQTT询问装置用的模板,并判断库中是否存在模板
|
||||
//4.询问设备支持的主题信息
|
||||
//将支持的主题入库
|
||||
askTopic(nDid);
|
||||
//5.MQTT询问装置用的模板,并判断库中是否存在模板
|
||||
//存在则建立关系;不存在则告警出来
|
||||
SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData();
|
||||
if (Objects.isNull(dictData)){
|
||||
@@ -137,14 +142,15 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
csLedgerParam.setSort(0);
|
||||
csLedgerFeignClient.add(csLedgerParam);
|
||||
//2.监测点表录入关系
|
||||
//todo 获取逻辑设备信息,更新监测点pt、ct相关信息 LdevInfo
|
||||
devAccessParam.getList().forEach(item->{
|
||||
String id = IdUtil.fastSimpleUUID();
|
||||
CsLinePO po = new CsLinePO();
|
||||
po.setLineId(id);
|
||||
po.setName(item.getName());
|
||||
po.setPosition(item.getPosition());
|
||||
po.setVolGrade(item.getVolGrade());
|
||||
//todo 目前ct、pt数据不确定 后期补
|
||||
//todo 目前电压等级、ct、pt数据不确定 后期补
|
||||
// po.setVolGrade(item.getVolGrade());
|
||||
// po.setPtRatio(item.getPtRatio());
|
||||
// po.setCtRatio(item.getCtRatio());
|
||||
po.setStatus(1);
|
||||
@@ -168,39 +174,75 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
//3.监测点拓扑图表录入关系
|
||||
csLineTopologyFeignClient.addList(appLineTopologyDiagramPoList);
|
||||
//4.新增装置-模板关系
|
||||
List<String> modelId = objectToList(redisUtil.getObjectByKey("MODEL" + devAccessParam.getNDid()));
|
||||
List<CsModelDto> modelId = objectToList(redisUtil.getObjectByKey("MODEL" + devAccessParam.getNDid()));
|
||||
modelId.forEach(item->{
|
||||
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
|
||||
csDevModelRelationAddParm.setDevId(vo.getId());
|
||||
csDevModelRelationAddParm.setModelId(item);
|
||||
csDevModelRelationAddParm.setModelId(item.getModelId());
|
||||
csDevModelRelationAddParm.setDid(item.getDid());
|
||||
devModelRelationFeignClient.addDevModelRelation(csDevModelRelationAddParm);
|
||||
});
|
||||
redisUtil.delete("MODEL" + devAccessParam.getNDid());
|
||||
//5.修改装置状态
|
||||
equipmentFeignClient.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode());
|
||||
//6.装置接入之后再设置心跳时间,超时改为掉线
|
||||
//redisUtil.saveByKeyWithExpire("MQTT:" + devAccessParam.getNDid(), Instant.now().toEpochMilli(),180L);
|
||||
//7.绑定装置和人的关系
|
||||
//6.绑定装置和人的关系
|
||||
CsDeviceUserPO po = new CsDeviceUserPO();
|
||||
po.setPrimaryUserId(RequestUtil.getUserIndex());
|
||||
po.setStatus("1");
|
||||
po.setSubUserId(RequestUtil.getUserIndex());
|
||||
po.setDeviceId(vo.getId());
|
||||
csDeviceUserFeignClient.add(Collections.singletonList(po));
|
||||
//8.删除redis监测点模板信息
|
||||
//7.删除redis监测点模板信息
|
||||
redisUtil.delete("LINE" + devAccessParam.getNDid());
|
||||
//todo 9.记录操作日志
|
||||
//todo 录入软件信息 SoftInfo
|
||||
|
||||
//todo 9.记录注册日志
|
||||
|
||||
//发起自动接入请求
|
||||
devAccess(devAccessParam.getNDid());
|
||||
//todo 10.记录接入日志
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new BusinessException(CommonResponseEnum.FAIL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void devAccess(String nDid) {
|
||||
String version = csTopicService.getVersion(nDid);
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
reqAndResParam.setMid(1);
|
||||
reqAndResParam.setDid(0);
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode()));
|
||||
reqAndResParam.setExpire(-1);
|
||||
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, PubUtils.obj2json(reqAndResParam),1,false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 平台对设备发起主题询问命令
|
||||
*/
|
||||
public void askTopic(String nDid) {
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
reqAndResParam.setMid(1);
|
||||
reqAndResParam.setDid(0);
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_1.getCode()));
|
||||
reqAndResParam.setExpire(-1);
|
||||
publisher.send("/Pfm/DevTopic/"+nDid, PubUtils.obj2json(reqAndResParam),1,false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 平台对设备发起注册命令
|
||||
* @param nDid
|
||||
* @param devType
|
||||
*/
|
||||
public void zhiLianRegister(String nDid,String devType) {
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
reqAndResParam.setMid(1);
|
||||
reqAndResParam.setDid("0");
|
||||
reqAndResParam.setDid(0);
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(TypeEnum.TYPE_2.getCode());
|
||||
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_2.getCode()));
|
||||
reqAndResParam.setExpire(-1);
|
||||
AccessDto accessDto = new AccessDto();
|
||||
accessDto.setNDid(nDid);
|
||||
@@ -209,12 +251,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
publisher.send("/Pfm/DevReg/"+nDid, PubUtils.obj2json(reqAndResParam),1,false);
|
||||
}
|
||||
|
||||
public List<String> objectToList(Object object) {
|
||||
List<String> urlList = new ArrayList<>();
|
||||
public List<CsModelDto> objectToList(Object object) {
|
||||
List<CsModelDto> urlList = new ArrayList<>();
|
||||
if (object != null) {
|
||||
if (object instanceof ArrayList<?>) {
|
||||
for (Object o : (List<?>) object) {
|
||||
urlList.add((String) o);
|
||||
urlList.add((CsModelDto) o);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,11 +24,16 @@ public class CsTopicServiceImpl extends ServiceImpl<CsTopicMapper, CsTopic> impl
|
||||
@Override
|
||||
public void addTopic(String nDid, List<CsTopic> list) {
|
||||
LambdaQueryWrapper<CsTopic> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(CsTopic::getNdid,nDid);
|
||||
lambdaQueryWrapper.eq(CsTopic::getNDid,nDid);
|
||||
List<CsTopic> topics = this.baseMapper.selectList(lambdaQueryWrapper);
|
||||
if (CollectionUtil.isNotEmpty(topics)){
|
||||
this.remove(lambdaQueryWrapper);
|
||||
}
|
||||
this.saveBatch(list,1000);
|
||||
this.saveBatch(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getVersion(String nDid) {
|
||||
return this.lambdaQuery().eq(CsTopic::getNDid,nDid).isNotNull(CsTopic::getVersion).list().get(0).getVersion();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,4 +50,4 @@ mybatis-plus:
|
||||
|
||||
|
||||
mqtt:
|
||||
client-id: access-boot1234567
|
||||
client-id: @artifactId@${random.value}
|
||||
Reference in New Issue
Block a user