diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 30d8de0..9b100e2 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -28,11 +28,13 @@ public enum AccessResponseEnum { DEV_IS_NOT_WG("A0303","注册装置不是网关!"), REGISTER_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"), + ACCESS_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"), RESPONSE_ERROR("A0304","装置请求响应错误!"), DEV_TYPE_NOT_FIND("A0305","装置类型未找到,需要录入!"), REGISTER_ERROR("A0306","装置注册失败!"), + ACCESS_ERROR("A0306","装置接入失败!"), DICT_MISSING("A0307","字典数据缺失!"), EPD_DICT_MISSING("A0307","Epd字典数据缺失!"), diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java index 82d20f8..cdf4ee5 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java @@ -14,54 +14,54 @@ public enum TypeEnum { /** * 报文消息类型 */ - TYPE_1("0x2101","平台询问装置支持主题"), - TYPE_2("0x2102","平台向装置发送设备注册请求"), - TYPE_3("0x2103","平台询问设备模板信息"), - TYPE_4("0x2104","设备配置信息下发"), - TYPE_5("0x2105","设备接入命令"), - TYPE_6("0x2106","平台向设备发送数据命令"), - TYPE_7("0x2107","平台向设备发送控制命令"), - TYPE_8("0x2131","设备文件/目录信息询问"), - TYPE_9("0x2132","设备文件下载"), - TYPE_10("0x2133","设备文件上传"), - TYPE_11("0x2134","设备文件删除"), - TYPE_12("0x2135","设备目录创建"), - TYPE_13("0x2136","设备根目录查询"), - TYPE_14("0x2201","设备心跳应答 "), - TYPE_15("0x2401","设备数据主动上送应答"), - TYPE_16("0x1201","设备支持主题应答"), - TYPE_17("0x1202","联网装置应答注册请求"), - TYPE_18("0x1203","设备模板信息应答 "), - TYPE_19("0x1204","下发设备配置信息应答 "), - TYPE_20("0x1205","设备接入平台应答"), - TYPE_21("0x1206","平台向设备发送数据命令"), - TYPE_22("0x1207","设备控制命令应答"), - TYPE_23("0x1231","设备文件信息应答 "), - TYPE_24("0x1232","设备文件下载应答 "), - TYPE_25("0x1233","设备文件上传"), - TYPE_26("0x1234","设备文件删除"), - TYPE_27("0x1235","设备文件删除"), - TYPE_28("0x1236","设备根目录查询应答"), - TYPE_29("0x1101","设备心跳请求"), - TYPE_30("0x1301","设备数据主动上送"), + TYPE_1("8449","平台询问装置支持主题"), + TYPE_2("8450","平台向装置发送设备注册请求"), + TYPE_3("8451","平台询问设备模板信息"), + TYPE_4("8452","设备配置信息下发"), + TYPE_5("8453","设备接入命令"), + TYPE_6("8454","平台向设备发送数据命令"), + TYPE_7("8455","平台向设备发送控制命令"), + TYPE_8("8497","设备文件/目录信息询问"), + TYPE_9("8498","设备文件下载"), + TYPE_10("8499","设备文件上传"), + TYPE_11("8500","设备文件删除"), + TYPE_12("8501","设备目录创建"), + TYPE_13("8502","设备根目录查询"), + TYPE_14("8705","设备心跳应答 "), + TYPE_15("9217","设备数据主动上送应答"), + TYPE_16("4609","设备支持主题应答"), + TYPE_17("4610","联网装置应答注册请求"), + TYPE_18("4611","设备模板信息应答 "), + TYPE_19("4612","下发设备配置信息应答 "), + TYPE_20("4613","设备接入平台应答"), + TYPE_21("4614","平台向设备发送数据命令"), + TYPE_22("4615","设备控制命令应答"), + TYPE_23("4657","设备文件信息应答 "), + TYPE_24("4658","设备文件下载应答 "), + TYPE_25("4659","设备文件上传"), + TYPE_26("4660","设备文件删除"), + TYPE_27("4661","设备文件删除"), + TYPE_28("4662","设备根目录查询应答"), + TYPE_29("4353","设备心跳请求"), + TYPE_30("4865","设备数据主动上送"), /** * 数据类型 */ - DATA_1("0x01","软件信息SoftInfo"), - DATA_2("0x02","设备信息LdevInfo"), - DATA_3("0x03","电能数据Epd"), - DATA_4("0x04","电能质量数据Pqd"), - DATA_5("0x05","基础测量数据Bmd"), - DATA_6("0x06","事件Evt"), - DATA_7("0x07","告警Alm"), - DATA_8("0x08","状态Sts"), - DATA_9("0x08","开入Di"), - DATA_10("0x0A","开出Do"), - DATA_11("0x0B","参数Param"), - DATA_12("0x0C","定值Set"), - DATA_13("0x0D","内部定值InSet"), - DATA_14("0x0E","控制Ctrl"), + DATA_1("1","软件信息SoftInfo"), + DATA_2("2","设备信息LdevInfo"), + DATA_3("3","电能数据Epd"), + DATA_4("4","电能质量数据Pqd"), + DATA_5("5","基础测量数据Bmd"), + DATA_6("6","事件Evt"), + DATA_7("7","告警Alm"), + DATA_8("8","状态Sts"), + DATA_9("9","开入Di"), + DATA_10("10","开出Do"), + DATA_11("11","参数Param"), + DATA_12("12","定值Set"), + DATA_13("13","内部定值InSet"), + DATA_14("14","控制Ctrl"), /** * 数据模型列表 diff --git a/iot-access/access-api/src/main/java/com/njcn/access/param/DevAccessParam.java b/iot-access/access-api/src/main/java/com/njcn/access/param/DevAccessParam.java index 46d815f..43af1d3 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/param/DevAccessParam.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/param/DevAccessParam.java @@ -48,15 +48,12 @@ public class DevAccessParam implements Serializable { private String position; @ApiModelProperty("电压等级") - @NotNull(message = "电压等级不能为空") private String volGrade; @ApiModelProperty("PT变比") - @NotNull(message = "PT变比不能为空") private String ptRatio; @ApiModelProperty("CT变比") - @NotNull(message = "CT变比不能为空") private String ctRatio; @ApiModelProperty("中心点经度") diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/CsModelDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/CsModelDto.java new file mode 100644 index 0000000..f4e8524 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/CsModelDto.java @@ -0,0 +1,21 @@ +package com.njcn.access.pojo.dto; + +import lombok.Data; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/7 18:44 + */ +@Data +public class CsModelDto { + + private String devType; + + private String modelId; + + private Integer did; + +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/DevCfgDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/DevCfgDto.java index 569309c..125ac50 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/DevCfgDto.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/DevCfgDto.java @@ -20,7 +20,7 @@ public class DevCfgDto implements Serializable { @SerializedName("Did") @ParamName("Did") @NotNull(message = "设备Id,不为空") - private String did; + private Integer did; @SerializedName("DevName") @ParamName("DevName") diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/ReqAndResDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/ReqAndResDto.java index 521ffd5..0f7e372 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/ReqAndResDto.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/ReqAndResDto.java @@ -28,7 +28,7 @@ public class ReqAndResDto implements Serializable { @SerializedName("Did") @ApiModelProperty("设备唯一标识lDid,填入0代表nDid") @NotBlank(message = "设备唯一标识lDid不能为空") - private String did; + private Integer did; @SerializedName("Pri") @ApiModelProperty("报文处理的优先级") @@ -38,7 +38,7 @@ public class ReqAndResDto implements Serializable { @SerializedName("Type") @ApiModelProperty("消息类型") @NotNull(message = "消息类型不能为空") - private String type; + private Integer type; @SerializedName("Msg") @ApiModelProperty("报文内容") diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java index 96f115f..c7e5dfb 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java @@ -37,7 +37,7 @@ public class ReqAndResParam implements Serializable { @SerializedName("Type") @ApiModelProperty("消息类型") @NotNull(message = "消息类型不能为空") - private String type; + private Integer type; @SerializedName("Msg") @ApiModelProperty("报文内容") diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineModel.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineModel.java index 171b775..871b6ce 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineModel.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsLineModel.java @@ -42,21 +42,4 @@ public class CsLineModel { * 安装位置 */ private String position; - - /** - * 电压等级 - */ - private String volGrade; - - /** - * PT变比 - */ - private String ptRatio; - - /** - * CT变比 - */ - private String ctRatio; - - } diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java index faae9b9..85312e8 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java @@ -1,10 +1,9 @@ package com.njcn.access.pojo.po; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import com.njcn.db.bo.BaseEntity; -import java.io.Serializable; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; /** *

@@ -14,32 +13,35 @@ import lombok.Setter; * @author xuyang * @since 2023-07-13 */ -@Getter -@Setter +@Data @TableName("cs_topic") -public class CsTopic extends BaseEntity { +public class CsTopic { private static final long serialVersionUID = 1L; /** * id */ + @TableId(value = "id") private String id; /** * 设备识别码 */ - private String ndid; + @TableField(value = "ndid") + private String nDid; /** * 主题名称 */ + @TableField(value = "topic") private String topic; /** - * 主题类型 + * 协议版本 */ - private Integer type; + @TableField(value = "version") + private String version; } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 7466a23..0db674f 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -9,9 +9,7 @@ import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.TypeEnum; -import com.njcn.access.pojo.dto.DevModInfoDto; -import com.njcn.access.pojo.dto.ModelDto; -import com.njcn.access.pojo.dto.ReqAndResDto; +import com.njcn.access.pojo.dto.*; import com.njcn.access.pojo.dto.heart.HeartBeatDto; import com.njcn.access.pojo.param.ReqAndResParam; import com.njcn.access.pojo.po.CsLineModel; @@ -21,6 +19,7 @@ import com.njcn.access.service.ICsTopicService; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.DevModelFeignClient; +import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.pojo.po.CsDevModelPO; import com.njcn.redis.utils.RedisUtil; import lombok.AllArgsConstructor; @@ -33,7 +32,9 @@ import org.springframework.transaction.annotation.Transactional; import javax.validation.ConstraintViolation; import javax.validation.Validator; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.*; +import java.util.stream.Collectors; /** * @author hongawen @@ -45,6 +46,8 @@ import java.util.*; @AllArgsConstructor public class MqttMessageHandler { + private final EquipmentFeignClient equipmentFeignClient; + private final DevModelFeignClient devModelFeignClient; private final ICsLineModelService csLineModelService; @@ -58,7 +61,7 @@ public class MqttMessageHandler { @Autowired Validator validator; - @MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1) + @MqttSubscribe(value = "/Dev/DevTopic/{edgeId}",qos = 1) @Transactional(rollbackFor = Exception.class) public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){ Gson gson = new Gson(); @@ -69,16 +72,18 @@ public class MqttMessageHandler { System.out.println(constraintViolation.getMessage()); }); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ - if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){ + if (Objects.equals(res.getType(), Integer.parseInt(TypeEnum.TYPE_16.getCode()))){ List list = new ArrayList<>(); - //fixme 这边获取数据需要调整 Map> map = (Map>)res.getMsg(); List topicList = map.get("Topic"); topicList.forEach(item->{ CsTopic csTopic = new CsTopic(); - csTopic.setNdid(nDid); + csTopic.setNDid(nDid); csTopic.setTopic(item); - csTopic.setType(0); + String version = item.split("/")[3]; + if (version.startsWith("V")){ + csTopic.setVersion(version); + } list.add(csTopic); }); csTopicService.addTopic(nDid,list); @@ -105,15 +110,16 @@ public class MqttMessageHandler { Gson gson = new Gson(); ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ - if (Objects.equals(res.getType(),TypeEnum.TYPE_17.getCode())){ + if (Objects.equals(res.getType(),Integer.parseInt(TypeEnum.TYPE_17.getCode()))){ //询问模板数据 ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); reqAndResParam.setMid(1); - reqAndResParam.setDid("0"); + reqAndResParam.setDid(0); reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - reqAndResParam.setType(TypeEnum.TYPE_3.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_3.getCode())); reqAndResParam.setExpire(-1); - publisher.send("/Pfm/DevCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false); + String version = csTopicService.getVersion(nDid); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); } else { log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); } @@ -138,27 +144,67 @@ public class MqttMessageHandler { Gson gson = new Gson(); ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class); List list = modelDto.getMsg().getDevMod(); + List list2 = modelDto.getMsg().getDevCfg(); if (CollectionUtils.isEmpty(list)){ log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR); } //校验前置传递的装置模板库中是否存在 - List modelId = new ArrayList<>(); + List modelList = new ArrayList<>(); list.forEach(item->{ + Integer did = null; + for (DevCfgDto item2 : list2) { + if (Objects.equals(item.getDevType(),item2.getDevType())){ + did = item2.getDid(); + } + } + CsModelDto csModelDto = new CsModelDto(); CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData(); if (Objects.isNull(po)){ log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); } - modelId.add(po.getId()); + csModelDto.setDevType(po.getDevTypeName()); + csModelDto.setModelId(po.getId()); + csModelDto.setDid(did); + modelList.add(csModelDto); }); - List lineList = csLineModelService.getMonitorNumByModelId(modelId); - String key = "LINE" + nDid; - //存储监测点模板信息,用于界面回显 - redisUtil.saveByKeyWithExpire(key,lineList,600L); //存储模板id String key2 = "MODEL" + nDid; - redisUtil.saveByKeyWithExpire(key2,modelId,600L); + redisUtil.saveByKeyWithExpire(key2,modelList,600L); + //存储监测点模板信息,用于界面回显 + List modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList()); + List lineList = csLineModelService.getMonitorNumByModelId(modelId); + String key = "LINE" + nDid; + redisUtil.saveByKeyWithExpire(key,lineList,600L); + } + + /** + * 设备接入平台应答 + * @param topic + * @param message + * @param version + * @param nDid + * @param payload + */ + @MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1) + @Transactional(rollbackFor = Exception.class) + public void devAccessOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){ + log.info("收到接入应答响应--->" + nDid); + Gson gson = new Gson(); + ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class); + if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ + if (Objects.equals(res.getType(),Integer.parseInt(TypeEnum.TYPE_20.getCode()))){ + //修改装置状态 + equipmentFeignClient.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode()); + //装置接入之后再设置心跳时间,超时改为掉线 + redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L); + } else { + log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); + } + } else { + log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); + } } @@ -178,9 +224,9 @@ public class MqttMessageHandler { HeartBeatDto heartBeatDto = new HeartBeatDto(); heartBeatDto.setTime(System.currentTimeMillis()/1000); reqAndResParam.setMid(1); - reqAndResParam.setDid("0"); + reqAndResParam.setDid(0); reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - reqAndResParam.setType(TypeEnum.TYPE_14.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_14.getCode())); reqAndResParam.setExpire(-1); reqAndResParam.setMsg(heartBeatDto); publisher.send("/Dev/PfmRsp/"+version+"/"+nDid,PubUtils.obj2json(reqAndResParam),1,false); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java index 0177e75..d8923e5 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java @@ -21,4 +21,12 @@ public interface ICsTopicService extends IService { * @param list 主题集合 */ void addTopic(String nDid,List list); + + /** + * 获取设备的协议版本 + * @param nDid 网络设备码 + */ + String getVersion(String nDid); + + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index bf665bb..cf379e8 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -765,6 +765,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { if (Objects.isNull(eleEpdPqd)){ throw new BusinessException(AccessResponseEnum.DICT_MISSING); } + // M 代表没有数据,因为influxDB要录入数据,此字段是主键,给个默认值 if (!Objects.isNull(eleEpdPqd.getHarmStart()) && !Objects.isNull(eleEpdPqd.getHarmEnd())){ if (Objects.equals(eleEpdPqd.getHarmStart(),1)){ for (int i = eleEpdPqd.getHarmStart(); i <= eleEpdPqd.getHarmEnd(); i++) { diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index d9e235a..2e4c2ad 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -1,17 +1,16 @@ package com.njcn.access.service.impl; import cn.hutool.core.util.IdUtil; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.TypeReference; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.param.DevAccessParam; import com.njcn.access.pojo.dto.AccessDto; +import com.njcn.access.pojo.dto.CsModelDto; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.service.ICsDeviceService; +import com.njcn.access.service.ICsTopicService; import com.njcn.access.utils.MqttUtil; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; @@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -72,6 +72,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private final MqttUtil mqttUtil; + private final ICsTopicService csTopicService; + @Override @Transactional(rollbackFor = {Exception.class}) public void devRegister(String nDid) { @@ -90,12 +92,15 @@ public class CsDeviceServiceImpl implements ICsDeviceService { throw new BusinessException(AccessResponseEnum.DEV_IS_NOT_ZL); } //3.判断客户端是否在线 - //mqttUtil.judgeClientOnline(nDid); - boolean mqttClient = mqttUtil.judgeClientOnline("access-boot1234567"); + String clientName = "NJCN-" + nDid.substring(nDid.length() - 6); + boolean mqttClient = mqttUtil.judgeClientOnline(clientName); if (!mqttClient){ throw new BusinessException(AccessResponseEnum.MISSING_CLIENT); } - //3.MQTT询问装置用的模板,并判断库中是否存在模板 + //4.询问设备支持的主题信息 + //将支持的主题入库 + askTopic(nDid); + //5.MQTT询问装置用的模板,并判断库中是否存在模板 //存在则建立关系;不存在则告警出来 SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData(); if (Objects.isNull(dictData)){ @@ -137,14 +142,15 @@ public class CsDeviceServiceImpl implements ICsDeviceService { csLedgerParam.setSort(0); csLedgerFeignClient.add(csLedgerParam); //2.监测点表录入关系 + //todo 获取逻辑设备信息,更新监测点pt、ct相关信息 LdevInfo devAccessParam.getList().forEach(item->{ String id = IdUtil.fastSimpleUUID(); CsLinePO po = new CsLinePO(); po.setLineId(id); po.setName(item.getName()); po.setPosition(item.getPosition()); - po.setVolGrade(item.getVolGrade()); - //todo 目前ct、pt数据不确定 后期补 + //todo 目前电压等级、ct、pt数据不确定 后期补 +// po.setVolGrade(item.getVolGrade()); // po.setPtRatio(item.getPtRatio()); // po.setCtRatio(item.getCtRatio()); po.setStatus(1); @@ -168,39 +174,75 @@ public class CsDeviceServiceImpl implements ICsDeviceService { //3.监测点拓扑图表录入关系 csLineTopologyFeignClient.addList(appLineTopologyDiagramPoList); //4.新增装置-模板关系 - List modelId = objectToList(redisUtil.getObjectByKey("MODEL" + devAccessParam.getNDid())); + List modelId = objectToList(redisUtil.getObjectByKey("MODEL" + devAccessParam.getNDid())); modelId.forEach(item->{ CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm(); csDevModelRelationAddParm.setDevId(vo.getId()); - csDevModelRelationAddParm.setModelId(item); + csDevModelRelationAddParm.setModelId(item.getModelId()); + csDevModelRelationAddParm.setDid(item.getDid()); devModelRelationFeignClient.addDevModelRelation(csDevModelRelationAddParm); }); redisUtil.delete("MODEL" + devAccessParam.getNDid()); //5.修改装置状态 equipmentFeignClient.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode()); - //6.装置接入之后再设置心跳时间,超时改为掉线 - //redisUtil.saveByKeyWithExpire("MQTT:" + devAccessParam.getNDid(), Instant.now().toEpochMilli(),180L); - //7.绑定装置和人的关系 + //6.绑定装置和人的关系 CsDeviceUserPO po = new CsDeviceUserPO(); po.setPrimaryUserId(RequestUtil.getUserIndex()); po.setStatus("1"); po.setSubUserId(RequestUtil.getUserIndex()); po.setDeviceId(vo.getId()); csDeviceUserFeignClient.add(Collections.singletonList(po)); - //8.删除redis监测点模板信息 + //7.删除redis监测点模板信息 redisUtil.delete("LINE" + devAccessParam.getNDid()); - //todo 9.记录操作日志 + //todo 录入软件信息 SoftInfo + + //todo 9.记录注册日志 + + //发起自动接入请求 + devAccess(devAccessParam.getNDid()); + //todo 10.记录接入日志 + } catch (Exception e) { throw new BusinessException(CommonResponseEnum.FAIL); } } + + public void devAccess(String nDid) { + String version = csTopicService.getVersion(nDid); + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(1); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); + reqAndResParam.setExpire(-1); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, PubUtils.obj2json(reqAndResParam),1,false); + } + + /** + * 平台对设备发起主题询问命令 + */ + public void askTopic(String nDid) { + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(1); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_1.getCode())); + reqAndResParam.setExpire(-1); + publisher.send("/Pfm/DevTopic/"+nDid, PubUtils.obj2json(reqAndResParam),1,false); + } + + /** + * 平台对设备发起注册命令 + * @param nDid + * @param devType + */ public void zhiLianRegister(String nDid,String devType) { ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); reqAndResParam.setMid(1); - reqAndResParam.setDid("0"); + reqAndResParam.setDid(0); reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - reqAndResParam.setType(TypeEnum.TYPE_2.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_2.getCode())); reqAndResParam.setExpire(-1); AccessDto accessDto = new AccessDto(); accessDto.setNDid(nDid); @@ -209,12 +251,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService { publisher.send("/Pfm/DevReg/"+nDid, PubUtils.obj2json(reqAndResParam),1,false); } - public List objectToList(Object object) { - List urlList = new ArrayList<>(); + public List objectToList(Object object) { + List urlList = new ArrayList<>(); if (object != null) { if (object instanceof ArrayList) { for (Object o : (List) object) { - urlList.add((String) o); + urlList.add((CsModelDto) o); } } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java index 507d648..15a3785 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java @@ -24,11 +24,16 @@ public class CsTopicServiceImpl extends ServiceImpl impl @Override public void addTopic(String nDid, List list) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(CsTopic::getNdid,nDid); + lambdaQueryWrapper.eq(CsTopic::getNDid,nDid); List topics = this.baseMapper.selectList(lambdaQueryWrapper); if (CollectionUtil.isNotEmpty(topics)){ this.remove(lambdaQueryWrapper); } - this.saveBatch(list,1000); + this.saveBatch(list); + } + + @Override + public String getVersion(String nDid) { + return this.lambdaQuery().eq(CsTopic::getNDid,nDid).isNotNull(CsTopic::getVersion).list().get(0).getVersion(); } } diff --git a/iot-access/access-boot/src/main/resources/bootstrap.yml b/iot-access/access-boot/src/main/resources/bootstrap.yml index 624eafa..eadfba5 100644 --- a/iot-access/access-boot/src/main/resources/bootstrap.yml +++ b/iot-access/access-boot/src/main/resources/bootstrap.yml @@ -50,4 +50,4 @@ mybatis-plus: mqtt: - client-id: access-boot1234567 \ No newline at end of file + client-id: @artifactId@${random.value} \ No newline at end of file