From 153a9ae1a59403c67e7ec4b2311f872c8d59ca04 Mon Sep 17 00:00:00 2001 From: xuyang <748613696@qq.com> Date: Thu, 13 Jul 2023 20:07:04 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E4=B8=BB=E9=A2=98=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E5=BD=95=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../njcn/access/enums/AccessResponseEnum.java | 1 + .../access/pojo/param/ReqAndResParam.java | 69 +++++++++++++++++ .../java/com/njcn/access/pojo/po/CsTopic.java | 45 +++++++++++ .../com/njcn/access/pojo/po/CsTopicPO.java | 30 -------- .../access/handler/MqttMessageHandler.java | 76 +++++++++---------- .../com/njcn/access/mapper/CsTopicMapper.java | 16 ++++ .../njcn/access/service/ICsDeviceService.java | 2 - .../njcn/access/service/ICsTopicService.java | 24 ++++++ .../service/impl/CsDeviceServiceImpl.java | 4 +- .../service/impl/CsGatewayServiceImpl.java | 4 + .../service/impl/CsTopicServiceImpl.java | 34 +++++++++ 11 files changed, 233 insertions(+), 72 deletions(-) create mode 100644 iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java create mode 100644 iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java delete mode 100644 iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopicPO.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsTopicMapper.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 24e8a0a..266da3a 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -27,6 +27,7 @@ public enum AccessResponseEnum { DEV_IS_NOT_WG("A0303","注册装置不是网关!"), REGISTER_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"), + RESPONSE_ERROR("A0304","装置请求响应错误!"), DEV_TYPE_NOT_FIND("A0305","装置类型未找到,需要录入!"), diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java new file mode 100644 index 0000000..96f115f --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/param/ReqAndResParam.java @@ -0,0 +1,69 @@ +package com.njcn.access.pojo.param; + +import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import java.io.Serializable; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/4/20 14:05 + */ +@Data +public class ReqAndResParam implements Serializable { + + @SerializedName("Mid") + @ApiModelProperty("报文ID,在请求报文中该值为请求ID") + @NotNull(message = "报文ID不能为空") + private Integer mid; + + @SerializedName("Did") + @ApiModelProperty("设备唯一标识lDid,填入0代表nDid") + @NotBlank(message = "设备唯一标识lDid不能为空") + private String did; + + @SerializedName("Pri") + @ApiModelProperty("报文处理的优先级") + @NotNull(message = "报文处理的优先级不能为空") + private Integer pri; + + @SerializedName("Type") + @ApiModelProperty("消息类型") + @NotNull(message = "消息类型不能为空") + private String type; + + @SerializedName("Msg") + @ApiModelProperty("报文内容") + private Object msg; + + /** + * 请求报文 + */ + @Data + @EqualsAndHashCode(callSuper = true) + public static class Req extends ReqAndResParam { + @SerializedName("Expire") + @ApiModelProperty("此报文过期的相对时间,单位秒,该字段为-1时表示永不过期.控制类报文接收者超时处理按此时间") + @NotNull(message = "报文过期的相对时间不能为空") + private Integer expire; + } + + /** + * 应答报文 + */ + @Data + @EqualsAndHashCode(callSuper = true) + public static class Res extends ReqAndResParam { + @SerializedName("Code") + @ApiModelProperty("标识应答的返回码") + @NotNull(message = "状态码不能为空") + private Integer code; + } +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java new file mode 100644 index 0000000..faae9b9 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopic.java @@ -0,0 +1,45 @@ +package com.njcn.access.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.db.bo.BaseEntity; +import java.io.Serializable; +import lombok.Getter; +import lombok.Setter; + +/** + *

+ * + *

+ * + * @author xuyang + * @since 2023-07-13 + */ +@Getter +@Setter +@TableName("cs_topic") +public class CsTopic extends BaseEntity { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + private String id; + + /** + * 设备识别码 + */ + private String ndid; + + /** + * 主题名称 + */ + private String topic; + + /** + * 主题类型 + */ + private Integer type; + + +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopicPO.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopicPO.java deleted file mode 100644 index 9b85cb0..0000000 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsTopicPO.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.njcn.access.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableName; -import com.sun.javafx.beans.IDProperty; -import lombok.Data; - -/** - *

- * - *

- * - * @author xuyang - * @since 2023-05-12 - */ -@Data -@TableName("cs_topic") -public class CsTopicPO { - - private static final long serialVersionUID = 1L; - - private String id; - - private String ndid; - - private String topic; - - private Integer type; - - -} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index d868505..03f89fd 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -13,8 +13,11 @@ import com.njcn.access.pojo.dto.DevModInfoDto; import com.njcn.access.pojo.dto.ModelDto; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.heart.HeartBeatDto; +import com.njcn.access.pojo.param.ReqAndResParam; import com.njcn.access.pojo.po.CsLineModel; +import com.njcn.access.pojo.po.CsTopic; import com.njcn.access.service.ICsLineModelService; +import com.njcn.access.service.ICsTopicService; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.DevModelFeignClient; @@ -29,11 +32,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import javax.validation.ConstraintViolation; import javax.validation.Validator; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.*; /** * @author hongawen @@ -47,12 +49,10 @@ public class MqttMessageHandler { private final DevModelFeignClient devModelFeignClient; - private final EquipmentFeignClient equipmentFeignClient; - - private final DicDataFeignClient dicDataFeignClient; - private final ICsLineModelService csLineModelService; + private final ICsTopicService csTopicService; + private final MqttPublisher publisher; private final RedisUtil redisUtil; @@ -60,37 +60,37 @@ public class MqttMessageHandler { @Autowired Validator validator; -// @MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1) -// @Transactional(rollbackFor = Exception.class) -// public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){ -// Gson gson = new Gson(); -// ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class); -// //检验传递的参数是否准确 -// Set> validate = validator.validate(res); -// validate.forEach(constraintViolation -> { -// System.out.println(constraintViolation.getMessage()); -// }); -// if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ -// if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){ -// List list = new ArrayList<>(); -// //fixme 这边获取数据需要调整 -// Map> map = (Map>)res.getMsg(); -// List topicList = map.get("Topic"); -// topicList.forEach(item->{ -// CsTopicPO csTopicPo = new CsTopicPO(); -// csTopicPo.setNdid(nDid); -// csTopicPo.setTopic(item); -// csTopicPo.setType(0); -// list.add(csTopicPo); -// }); -// csTopicService.addList(list); -// } else { -// log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); -// } -// } else { -// log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage()); -// } -// } + @MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1) + @Transactional(rollbackFor = Exception.class) + public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){ + Gson gson = new Gson(); + ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class); + //检验传递的参数是否准确 + Set> validate = validator.validate(res); + validate.forEach(constraintViolation -> { + System.out.println(constraintViolation.getMessage()); + }); + if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ + if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){ + List list = new ArrayList<>(); + //fixme 这边获取数据需要调整 + Map> map = (Map>)res.getMsg(); + List topicList = map.get("Topic"); + topicList.forEach(item->{ + CsTopic csTopic = new CsTopic(); + csTopic.setNdid(nDid); + csTopic.setTopic(item); + csTopic.setType(0); + list.add(csTopic); + }); + csTopicService.addTopic(nDid,list); + } else { + log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage()); + } + } else { + log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage()); + } + } /** * 装置注册应答 diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsTopicMapper.java b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsTopicMapper.java new file mode 100644 index 0000000..a3752b2 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsTopicMapper.java @@ -0,0 +1,16 @@ +package com.njcn.access.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.access.pojo.po.CsTopic; + +/** + *

+ * Mapper 接口 + *

+ * + * @author xuyang + * @since 2023-07-13 + */ +public interface CsTopicMapper extends BaseMapper { + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceService.java index 25eae19..f519fbb 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceService.java @@ -1,8 +1,6 @@ package com.njcn.access.service; import com.njcn.access.param.DevAccessParam; -import com.njcn.access.param.WgDeviceRegisterParam; -import com.njcn.access.param.WgRegisterParam; /** * 类的介绍: diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java new file mode 100644 index 0000000..0177e75 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsTopicService.java @@ -0,0 +1,24 @@ +package com.njcn.access.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.access.pojo.po.CsTopic; + +import java.util.List; + +/** + *

+ * 服务类 + *

+ * + * @author xuyang + * @since 2023-07-13 + */ +public interface ICsTopicService extends IService { + + /** + * 新增设备主题信息 + * @param nDid 网络设备码 + * @param list 主题集合 + */ + void addTopic(String nDid,List list); +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 017bc02..c1c2dd8 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -185,8 +185,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService { redisUtil.delete("MODEL" + devAccessParam.getNDid()); //5.修改装置状态 equipmentFeignClient.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode()); - //6.设置心跳时间,超时改为掉线 - redisUtil.saveByKeyWithExpire("MQTT:" + devAccessParam.getNDid(), Instant.now().toEpochMilli(),180L); + //6.装置接入之后再设置心跳时间,超时改为掉线 + //redisUtil.saveByKeyWithExpire("MQTT:" + devAccessParam.getNDid(), Instant.now().toEpochMilli(),180L); //7.绑定装置和人的关系 CsDeviceUserPO po = new CsDeviceUserPO(); po.setPrimaryUserId(RequestUtil.getUserIndex()); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsGatewayServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsGatewayServiceImpl.java index 302893c..3720e78 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsGatewayServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsGatewayServiceImpl.java @@ -106,6 +106,10 @@ public class CsGatewayServiceImpl extends ServiceImpl list) { + //判断网关是否在线 + //1.根据装置型号查询最新模板信息,发送给网关校验 + //2.网关给出应答,哪些可以接入,哪些无法接入。将可以接入的装置入库,不能接入的返回给前端 + //3.可以接入的设备填写监测点信息、选择拓扑图 list.forEach(item->{ diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java new file mode 100644 index 0000000..507d648 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsTopicServiceImpl.java @@ -0,0 +1,34 @@ +package com.njcn.access.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.access.mapper.CsTopicMapper; +import com.njcn.access.pojo.po.CsTopic; +import com.njcn.access.service.ICsTopicService; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + *

+ * 服务实现类 + *

+ * + * @author xuyang + * @since 2023-07-13 + */ +@Service +public class CsTopicServiceImpl extends ServiceImpl implements ICsTopicService { + + @Override + public void addTopic(String nDid, List list) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(CsTopic::getNdid,nDid); + List topics = this.baseMapper.selectList(lambdaQueryWrapper); + if (CollectionUtil.isNotEmpty(topics)){ + this.remove(lambdaQueryWrapper); + } + this.saveBatch(list,1000); + } +}