模拟直连设备在APP注册接入
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.njcn.access.handler;
|
||||
|
||||
import com.alibaba.excel.util.CollectionUtils;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
|
||||
import com.github.tocrhz.mqtt.annotation.NamedValue;
|
||||
@@ -10,22 +11,17 @@ import com.njcn.access.enums.AccessResponseEnum;
|
||||
import com.njcn.access.enums.TypeEnum;
|
||||
import com.njcn.access.pojo.dto.DevModInfoDto;
|
||||
import com.njcn.access.pojo.dto.ModelDto;
|
||||
import com.njcn.access.pojo.dto.TopicDto;
|
||||
import com.njcn.access.pojo.dto.ReqAndResDto;
|
||||
import com.njcn.access.pojo.dto.heart.HeartBeatDto;
|
||||
import com.njcn.access.pojo.param.ReqAndResParam;
|
||||
import com.njcn.access.pojo.po.CsTopicPO;
|
||||
import com.njcn.access.service.ICsTopicService;
|
||||
import com.njcn.algorithm.api.DevModelFeignClient;
|
||||
import com.njcn.algorithm.api.EquipmentFeignClient;
|
||||
import com.njcn.algorithm.pojo.param.CsDevModelQueryListParm;
|
||||
import com.njcn.algorithm.pojo.vo.CsDevModelPageVO;
|
||||
import com.njcn.algorithm.pojo.vo.CsEquipmentDeliveryVO;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.access.pojo.po.CsLineModel;
|
||||
import com.njcn.access.service.ICsLineModelService;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
import com.njcn.csdevice.api.DevModelFeignClient;
|
||||
import com.njcn.csdevice.api.EquipmentFeignClient;
|
||||
import com.njcn.csdevice.pojo.po.CsDevModelPO;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.DicDataFeignClient;
|
||||
import com.njcn.system.enums.DicDataEnum;
|
||||
import com.njcn.system.pojo.po.DictData;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
@@ -33,11 +29,11 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.validation.ConstraintViolation;
|
||||
import javax.validation.Validator;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
@@ -49,52 +45,52 @@ import java.util.*;
|
||||
@AllArgsConstructor
|
||||
public class MqttMessageHandler {
|
||||
|
||||
private final EquipmentFeignClient equipmentFeignClient;
|
||||
|
||||
private final DevModelFeignClient devModelFeignClient;
|
||||
|
||||
private final EquipmentFeignClient equipmentFeignClient;
|
||||
|
||||
private final DicDataFeignClient dicDataFeignClient;
|
||||
|
||||
private final ICsLineModelService csLineModelService;
|
||||
|
||||
private final MqttPublisher publisher;
|
||||
|
||||
private final RedisUtil redisUtil;
|
||||
|
||||
private final ICsTopicService csTopicService;
|
||||
|
||||
@Autowired
|
||||
Validator validator;
|
||||
|
||||
@MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1)
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
Gson gson = new Gson();
|
||||
ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
|
||||
//检验传递的参数是否准确
|
||||
Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
|
||||
validate.forEach(constraintViolation -> {
|
||||
System.out.println(constraintViolation.getMessage());
|
||||
});
|
||||
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){
|
||||
List<CsTopicPO> list = new ArrayList<>();
|
||||
//fixme 这边获取数据需要调整
|
||||
Map<String,List<String>> map = (Map<String,List<String>>)res.getMsg();
|
||||
List<String> topicList = map.get("Topic");
|
||||
topicList.forEach(item->{
|
||||
CsTopicPO csTopicPo = new CsTopicPO();
|
||||
csTopicPo.setNdid(nDid);
|
||||
csTopicPo.setTopic(item);
|
||||
csTopicPo.setType(0);
|
||||
list.add(csTopicPo);
|
||||
});
|
||||
csTopicService.addList(list);
|
||||
} else {
|
||||
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
|
||||
}
|
||||
}
|
||||
// @MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1)
|
||||
// @Transactional(rollbackFor = Exception.class)
|
||||
// public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
// Gson gson = new Gson();
|
||||
// ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
|
||||
// //检验传递的参数是否准确
|
||||
// Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
|
||||
// validate.forEach(constraintViolation -> {
|
||||
// System.out.println(constraintViolation.getMessage());
|
||||
// });
|
||||
// if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
// if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){
|
||||
// List<CsTopicPO> list = new ArrayList<>();
|
||||
// //fixme 这边获取数据需要调整
|
||||
// Map<String,List<String>> map = (Map<String,List<String>>)res.getMsg();
|
||||
// List<String> topicList = map.get("Topic");
|
||||
// topicList.forEach(item->{
|
||||
// CsTopicPO csTopicPo = new CsTopicPO();
|
||||
// csTopicPo.setNdid(nDid);
|
||||
// csTopicPo.setTopic(item);
|
||||
// csTopicPo.setType(0);
|
||||
// list.add(csTopicPo);
|
||||
// });
|
||||
// csTopicService.addList(list);
|
||||
// } else {
|
||||
// log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
|
||||
// }
|
||||
// } else {
|
||||
// log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
|
||||
// }
|
||||
// }
|
||||
|
||||
/**
|
||||
* 装置注册应答
|
||||
@@ -108,31 +104,26 @@ public class MqttMessageHandler {
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void devOperation(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
log.info("收到注册应答响应--->" + nDid);
|
||||
//这边用redis缓存来判断是否接收响应
|
||||
redisUtil.saveByKeyWithExpire(nDid,true,5L);
|
||||
Gson gson = new Gson();
|
||||
ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
|
||||
//检验传递的参数是否准确
|
||||
Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
|
||||
validate.forEach(constraintViolation -> {
|
||||
System.out.println(constraintViolation.getMessage());
|
||||
});
|
||||
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
|
||||
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
if (Objects.equals(res.getType(),TypeEnum.TYPE_17.getCode())){
|
||||
equipmentFeignClient.updateStatusBynDid(nDid, AccessEnum.REGISTERED.getCode());
|
||||
//询问模板数据
|
||||
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
reqAndResParam.setMid(1);
|
||||
reqAndResParam.setDid("0");
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(TypeEnum.TYPE_3.getCode());
|
||||
reqAndResParam.setExpire(-1);
|
||||
publisher.send("/Pfm/DevCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false);
|
||||
//这边用redis缓存来判断是否接收响应
|
||||
redisUtil.saveByKey(nDid,AccessEnum.SUCCESS.getCode());
|
||||
} else {
|
||||
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
|
||||
log.info(AccessResponseEnum.REGISTER_RESPONSE_ERROR.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,50 +142,31 @@ public class MqttMessageHandler {
|
||||
public void devModelOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
Gson gson = new Gson();
|
||||
ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class);
|
||||
HttpResult<CsEquipmentDeliveryVO> pojo = equipmentFeignClient.queryEquipmentByndid(nDid);
|
||||
if (!Objects.isNull(pojo)){
|
||||
String devType = pojo.getData().getDevModel();
|
||||
String dictCode = dicDataFeignClient.getDicDataById(devType).getData().getCode();
|
||||
//直连设备处理
|
||||
if (Objects.equals(dictCode, DicDataEnum.CONNECT_DEV.getCode())){
|
||||
List<DevModInfoDto> list = modelDto.getDevMod();
|
||||
list.forEach(item->{
|
||||
DictData dicData = dicDataFeignClient.getDicDataByCode(item.getDevType()).getData();
|
||||
CsDevModelQueryListParm csDevModelQueryListParm = new CsDevModelQueryListParm();
|
||||
if (Objects.isNull(dicData)) {
|
||||
log.info(AccessResponseEnum.DEV_TYPE_NOT_FIND.getMessage());
|
||||
return;
|
||||
} else {
|
||||
csDevModelQueryListParm.setDevType(dicData.getId());
|
||||
}
|
||||
csDevModelQueryListParm.setVersionNo(item.getVersionNo());
|
||||
csDevModelQueryListParm.setVersionDate(item.getVersionDate());
|
||||
CsDevModelPageVO csDevModelPageVO = devModelFeignClient.queryDevModelOne(csDevModelQueryListParm).getData();
|
||||
if (Objects.isNull(csDevModelPageVO)){
|
||||
log.info(AccessResponseEnum.MODEL_NO_FIND.getMessage());
|
||||
} else {
|
||||
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
|
||||
reqAndResParam.setMid(1);
|
||||
reqAndResParam.setDid("0");
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(TypeEnum.TYPE_5.getCode());
|
||||
reqAndResParam.setExpire(-1);
|
||||
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
|
||||
//将装置状态改为接入
|
||||
equipmentFeignClient.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode());
|
||||
//设置心跳时间,超时改为掉线
|
||||
redisUtil.saveByKeyWithExpire("MQTT:" + nDid,Instant.now().toEpochMilli(),180L);
|
||||
}
|
||||
});
|
||||
}
|
||||
//网关处理 生成报文下发给装置,装置响应版本
|
||||
else if (Objects.equals(dictCode, DicDataEnum.GATEWAY_DEV.getCode())){
|
||||
//todo 处理待定
|
||||
System.out.println("网关设备判断");
|
||||
}
|
||||
} else {
|
||||
log.info(AccessResponseEnum.NDID_NO_FIND.getMessage());
|
||||
List<DevModInfoDto> list = modelDto.getMsg().getDevMod();
|
||||
if (CollectionUtils.isEmpty(list)){
|
||||
log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage());
|
||||
throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR);
|
||||
}
|
||||
//校验前置传递的装置模板库中是否存在
|
||||
list.forEach(item->{
|
||||
CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData();
|
||||
if (Objects.isNull(po)){
|
||||
log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage());
|
||||
throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND);
|
||||
}
|
||||
});
|
||||
//fixme 这边先写死模板id,后期要选择电能质量的模板来确定监测点个数
|
||||
String modelId = "1";
|
||||
List<CsLineModel> lineList = csLineModelService.getMonitorNumByModelId(modelId);
|
||||
String key = "LINE" + nDid;
|
||||
//存储监测点模板信息,用于界面回显
|
||||
redisUtil.saveByKey(key,lineList);
|
||||
//存储模板id
|
||||
//todo 这边也是要调整的
|
||||
String key2 = "MODEL" + nDid;
|
||||
redisUtil.saveByKey(key2,modelId);
|
||||
redisUtil.delete(nDid);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -210,7 +182,7 @@ public class MqttMessageHandler {
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void devHeartBeat(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) {
|
||||
//响应请求
|
||||
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
HeartBeatDto heartBeatDto = new HeartBeatDto();
|
||||
heartBeatDto.setTime(System.currentTimeMillis()/1000);
|
||||
reqAndResParam.setMid(1);
|
||||
@@ -221,7 +193,7 @@ public class MqttMessageHandler {
|
||||
reqAndResParam.setMsg(heartBeatDto);
|
||||
publisher.send("/Dev/PfmRsp/"+version+"/"+nDid,PubUtils.obj2json(reqAndResParam),1,false);
|
||||
//处理业务逻辑
|
||||
ReqAndResParam.Res res = PubUtils.json2obj(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
|
||||
ReqAndResDto.Res res = PubUtils.json2obj(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
|
||||
Object object = res.getMsg();
|
||||
if (!Objects.isNull(object)){
|
||||
List<String> abnormalList = new ArrayList<>();
|
||||
|
||||
Reference in New Issue
Block a user