设备主题获取录入
This commit is contained in:
@@ -27,6 +27,7 @@ public enum AccessResponseEnum {
|
||||
DEV_IS_NOT_WG("A0303","注册装置不是网关!"),
|
||||
|
||||
REGISTER_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"),
|
||||
RESPONSE_ERROR("A0304","装置请求响应错误!"),
|
||||
|
||||
DEV_TYPE_NOT_FIND("A0305","装置类型未找到,需要录入!"),
|
||||
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
package com.njcn.access.pojo.param;
|
||||
|
||||
import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import javax.validation.constraints.NotBlank;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2023/4/20 14:05
|
||||
*/
|
||||
@Data
|
||||
public class ReqAndResParam implements Serializable {
|
||||
|
||||
@SerializedName("Mid")
|
||||
@ApiModelProperty("报文ID,在请求报文中该值为请求ID")
|
||||
@NotNull(message = "报文ID不能为空")
|
||||
private Integer mid;
|
||||
|
||||
@SerializedName("Did")
|
||||
@ApiModelProperty("设备唯一标识lDid,填入0代表nDid")
|
||||
@NotBlank(message = "设备唯一标识lDid不能为空")
|
||||
private String did;
|
||||
|
||||
@SerializedName("Pri")
|
||||
@ApiModelProperty("报文处理的优先级")
|
||||
@NotNull(message = "报文处理的优先级不能为空")
|
||||
private Integer pri;
|
||||
|
||||
@SerializedName("Type")
|
||||
@ApiModelProperty("消息类型")
|
||||
@NotNull(message = "消息类型不能为空")
|
||||
private String type;
|
||||
|
||||
@SerializedName("Msg")
|
||||
@ApiModelProperty("报文内容")
|
||||
private Object msg;
|
||||
|
||||
/**
|
||||
* 请求报文
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public static class Req extends ReqAndResParam {
|
||||
@SerializedName("Expire")
|
||||
@ApiModelProperty("此报文过期的相对时间,单位秒,该字段为-1时表示永不过期.控制类报文接收者超时处理按此时间")
|
||||
@NotNull(message = "报文过期的相对时间不能为空")
|
||||
private Integer expire;
|
||||
}
|
||||
|
||||
/**
|
||||
* 应答报文
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public static class Res extends ReqAndResParam {
|
||||
@SerializedName("Code")
|
||||
@ApiModelProperty("标识应答的返回码")
|
||||
@NotNull(message = "状态码不能为空")
|
||||
private Integer code;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.njcn.access.pojo.po;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.njcn.db.bo.BaseEntity;
|
||||
import java.io.Serializable;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-07-13
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@TableName("cs_topic")
|
||||
public class CsTopic extends BaseEntity {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* id
|
||||
*/
|
||||
private String id;
|
||||
|
||||
/**
|
||||
* 设备识别码
|
||||
*/
|
||||
private String ndid;
|
||||
|
||||
/**
|
||||
* 主题名称
|
||||
*/
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* 主题类型
|
||||
*/
|
||||
private Integer type;
|
||||
|
||||
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
package com.njcn.access.pojo.po;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.sun.javafx.beans.IDProperty;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-05-12
|
||||
*/
|
||||
@Data
|
||||
@TableName("cs_topic")
|
||||
public class CsTopicPO {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private String id;
|
||||
|
||||
private String ndid;
|
||||
|
||||
private String topic;
|
||||
|
||||
private Integer type;
|
||||
|
||||
|
||||
}
|
||||
@@ -13,8 +13,11 @@ import com.njcn.access.pojo.dto.DevModInfoDto;
|
||||
import com.njcn.access.pojo.dto.ModelDto;
|
||||
import com.njcn.access.pojo.dto.ReqAndResDto;
|
||||
import com.njcn.access.pojo.dto.heart.HeartBeatDto;
|
||||
import com.njcn.access.pojo.param.ReqAndResParam;
|
||||
import com.njcn.access.pojo.po.CsLineModel;
|
||||
import com.njcn.access.pojo.po.CsTopic;
|
||||
import com.njcn.access.service.ICsLineModelService;
|
||||
import com.njcn.access.service.ICsTopicService;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
import com.njcn.csdevice.api.DevModelFeignClient;
|
||||
@@ -29,11 +32,10 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.validation.ConstraintViolation;
|
||||
import javax.validation.Validator;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
@@ -47,12 +49,10 @@ public class MqttMessageHandler {
|
||||
|
||||
private final DevModelFeignClient devModelFeignClient;
|
||||
|
||||
private final EquipmentFeignClient equipmentFeignClient;
|
||||
|
||||
private final DicDataFeignClient dicDataFeignClient;
|
||||
|
||||
private final ICsLineModelService csLineModelService;
|
||||
|
||||
private final ICsTopicService csTopicService;
|
||||
|
||||
private final MqttPublisher publisher;
|
||||
|
||||
private final RedisUtil redisUtil;
|
||||
@@ -60,37 +60,37 @@ public class MqttMessageHandler {
|
||||
@Autowired
|
||||
Validator validator;
|
||||
|
||||
// @MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1)
|
||||
// @Transactional(rollbackFor = Exception.class)
|
||||
// public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
// Gson gson = new Gson();
|
||||
// ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
|
||||
// //检验传递的参数是否准确
|
||||
// Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
|
||||
// validate.forEach(constraintViolation -> {
|
||||
// System.out.println(constraintViolation.getMessage());
|
||||
// });
|
||||
// if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
// if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){
|
||||
// List<CsTopicPO> list = new ArrayList<>();
|
||||
// //fixme 这边获取数据需要调整
|
||||
// Map<String,List<String>> map = (Map<String,List<String>>)res.getMsg();
|
||||
// List<String> topicList = map.get("Topic");
|
||||
// topicList.forEach(item->{
|
||||
// CsTopicPO csTopicPo = new CsTopicPO();
|
||||
// csTopicPo.setNdid(nDid);
|
||||
// csTopicPo.setTopic(item);
|
||||
// csTopicPo.setType(0);
|
||||
// list.add(csTopicPo);
|
||||
// });
|
||||
// csTopicService.addList(list);
|
||||
// } else {
|
||||
// log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
|
||||
// }
|
||||
// } else {
|
||||
// log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
|
||||
// }
|
||||
// }
|
||||
@MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1)
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
|
||||
Gson gson = new Gson();
|
||||
ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
|
||||
//检验传递的参数是否准确
|
||||
Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
|
||||
validate.forEach(constraintViolation -> {
|
||||
System.out.println(constraintViolation.getMessage());
|
||||
});
|
||||
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){
|
||||
List<CsTopic> list = new ArrayList<>();
|
||||
//fixme 这边获取数据需要调整
|
||||
Map<String,List<String>> map = (Map<String,List<String>>)res.getMsg();
|
||||
List<String> topicList = map.get("Topic");
|
||||
topicList.forEach(item->{
|
||||
CsTopic csTopic = new CsTopic();
|
||||
csTopic.setNdid(nDid);
|
||||
csTopic.setTopic(item);
|
||||
csTopic.setType(0);
|
||||
list.add(csTopic);
|
||||
});
|
||||
csTopicService.addTopic(nDid,list);
|
||||
} else {
|
||||
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 装置注册应答
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.njcn.access.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.njcn.access.pojo.po.CsTopic;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Mapper 接口
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-07-13
|
||||
*/
|
||||
public interface CsTopicMapper extends BaseMapper<CsTopic> {
|
||||
|
||||
}
|
||||
@@ -1,8 +1,6 @@
|
||||
package com.njcn.access.service;
|
||||
|
||||
import com.njcn.access.param.DevAccessParam;
|
||||
import com.njcn.access.param.WgDeviceRegisterParam;
|
||||
import com.njcn.access.param.WgRegisterParam;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.njcn.access.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.njcn.access.pojo.po.CsTopic;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 服务类
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-07-13
|
||||
*/
|
||||
public interface ICsTopicService extends IService<CsTopic> {
|
||||
|
||||
/**
|
||||
* 新增设备主题信息
|
||||
* @param nDid 网络设备码
|
||||
* @param list 主题集合
|
||||
*/
|
||||
void addTopic(String nDid,List<CsTopic> list);
|
||||
}
|
||||
@@ -185,8 +185,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
redisUtil.delete("MODEL" + devAccessParam.getNDid());
|
||||
//5.修改装置状态
|
||||
equipmentFeignClient.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode());
|
||||
//6.设置心跳时间,超时改为掉线
|
||||
redisUtil.saveByKeyWithExpire("MQTT:" + devAccessParam.getNDid(), Instant.now().toEpochMilli(),180L);
|
||||
//6.装置接入之后再设置心跳时间,超时改为掉线
|
||||
//redisUtil.saveByKeyWithExpire("MQTT:" + devAccessParam.getNDid(), Instant.now().toEpochMilli(),180L);
|
||||
//7.绑定装置和人的关系
|
||||
CsDeviceUserPO po = new CsDeviceUserPO();
|
||||
po.setPrimaryUserId(RequestUtil.getUserIndex());
|
||||
|
||||
@@ -106,6 +106,10 @@ public class CsGatewayServiceImpl extends ServiceImpl<CsGatewayMapper, CsGateway
|
||||
|
||||
@Override
|
||||
public void registerByWg(List<WgDeviceRegisterParam> list) {
|
||||
//判断网关是否在线
|
||||
//1.根据装置型号查询最新模板信息,发送给网关校验
|
||||
//2.网关给出应答,哪些可以接入,哪些无法接入。将可以接入的装置入库,不能接入的返回给前端
|
||||
//3.可以接入的设备填写监测点信息、选择拓扑图
|
||||
list.forEach(item->{
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.njcn.access.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.access.mapper.CsTopicMapper;
|
||||
import com.njcn.access.pojo.po.CsTopic;
|
||||
import com.njcn.access.service.ICsTopicService;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 服务实现类
|
||||
* </p>
|
||||
*
|
||||
* @author xuyang
|
||||
* @since 2023-07-13
|
||||
*/
|
||||
@Service
|
||||
public class CsTopicServiceImpl extends ServiceImpl<CsTopicMapper, CsTopic> implements ICsTopicService {
|
||||
|
||||
@Override
|
||||
public void addTopic(String nDid, List<CsTopic> list) {
|
||||
LambdaQueryWrapper<CsTopic> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(CsTopic::getNdid,nDid);
|
||||
List<CsTopic> topics = this.baseMapper.selectList(lambdaQueryWrapper);
|
||||
if (CollectionUtil.isNotEmpty(topics)){
|
||||
this.remove(lambdaQueryWrapper);
|
||||
}
|
||||
this.saveBatch(list,1000);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user