MQTT通讯功能联调

This commit is contained in:
2023-08-10 20:34:09 +08:00
parent 3412b0f0af
commit 02f5f7c031
29 changed files with 672 additions and 109 deletions

View File

@@ -21,6 +21,14 @@ public enum AccessEnum {
REGISTERED(2, "注册"),
ACCESS(3, "接入"),
/**
* 装置运行状态
* 0:删除 1:离线 2:在线
*/
DEL(0, "删除"),
OFFLINE(1, "离线"),
ONLINE(2, "在线"),
/**
* 报文处理优先级
*/

View File

@@ -42,7 +42,7 @@ public enum TypeEnum {
TYPE_26("4660","设备文件删除"),
TYPE_27("4661","设备文件删除"),
TYPE_28("4662","设备根目录查询应答"),
TYPE_29("4353","设备心跳请求"),
TYPE_29("9217","设备心跳请求"),
TYPE_30("4865","设备数据主动上送"),
/**

View File

@@ -1,6 +1,7 @@
package com.njcn.access.pojo.dto;
import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
import com.njcn.access.annotation.ParamName;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -23,25 +24,30 @@ public class ReqAndResDto implements Serializable {
@SerializedName("Mid")
@ApiModelProperty("报文ID,在请求报文中该值为请求ID")
@NotNull(message = "报文ID不能为空")
@ParamName("Mid")
private Integer mid;
@SerializedName("Did")
@ApiModelProperty("设备唯一标识lDid,填入0代表nDid")
@NotBlank(message = "设备唯一标识lDid不能为空")
@ParamName("Did")
private Integer did;
@SerializedName("Pri")
@ApiModelProperty("报文处理的优先级")
@NotNull(message = "报文处理的优先级不能为空")
@ParamName("Pri")
private Integer pri;
@SerializedName("Type")
@ApiModelProperty("消息类型")
@NotNull(message = "消息类型不能为空")
@ParamName("Type")
private Integer type;
@SerializedName("Msg")
@ApiModelProperty("报文内容")
@ParamName("Msg")
private Object msg;
/**
@@ -53,6 +59,7 @@ public class ReqAndResDto implements Serializable {
@SerializedName("Expire")
@ApiModelProperty("此报文过期的相对时间,单位秒,该字段为-1时表示永不过期.控制类报文接收者超时处理按此时间")
@NotNull(message = "报文过期的相对时间不能为空")
@ParamName("Expire")
private Integer expire;
}
@@ -65,6 +72,7 @@ public class ReqAndResDto implements Serializable {
@SerializedName("Code")
@ApiModelProperty("标识应答的返回码")
@NotNull(message = "状态码不能为空")
@ParamName("Code")
private Integer code;
}

View File

@@ -44,11 +44,11 @@ public class EpdPqdDto implements Serializable {
@SerializedName("HarmStart")
@ApiModelProperty("数据开始谐波次数")
private Integer harmStart;
private Double harmStart;
@SerializedName("HarmEnd")
@ApiModelProperty("数据结束谐波次数")
private Integer harmEnd;
private Double harmEnd;
@SerializedName("StatMethod")
@ApiModelProperty("数据统计方法(max,min,avg,cp95)")

View File

@@ -6,6 +6,7 @@ import lombok.Data;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Date;
/**
* <p>
@@ -46,7 +47,7 @@ public class CsSoftInfoPO extends BaseEntity {
/**
* 应用程序发布日期
*/
private LocalDateTime appDate;
private Date appDate;
/**
* 应用程序校验码

View File

@@ -9,6 +9,8 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.csdevice.api.CsGroupFeignClient;
import com.njcn.csdevice.enums.DeviceOperate;
import com.njcn.web.advice.DeviceLog;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
@@ -42,6 +44,7 @@ public class CsDevModelController extends BaseController {
@PostMapping("/addModel")
@ApiOperation("新增设备模板")
@Transactional(rollbackFor = {Exception.class})
@DeviceLog(operateType = DeviceOperate.ADD_MODEL)
public HttpResult<String> addModel(@RequestPart("file") @Validated MultipartFile file){
String methodDescribe = getMethodDescribe("addModel");
//1.录入通用字典

View File

@@ -1,5 +1,6 @@
package com.njcn.access.controller;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.param.DevAccessParam;
import com.njcn.access.service.ICsDeviceService;
import com.njcn.common.pojo.annotation.OperateInfo;
@@ -7,6 +8,7 @@ import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.enums.DeviceOperate;
import com.njcn.web.advice.DeviceLog;
import com.njcn.web.controller.BaseController;
@@ -35,6 +37,8 @@ public class CsDeviceController extends BaseController {
private final ICsDeviceService csDeviceService;
private final EquipmentFeignClient equipmentFeignClient;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/register")
@ApiOperation("直连设备状态判断")
@@ -49,6 +53,7 @@ public class CsDeviceController extends BaseController {
@PostMapping("/model")
@ApiOperation("获取直连设备模板信息")
@ApiImplicitParam(name = "nDid", value = "设备识别码", required = true)
@DeviceLog(operateType = DeviceOperate.MODEL)
public HttpResult<Object> getModel(@RequestParam String nDid){
String methodDescribe = getMethodDescribe("getModel");
Object object = csDeviceService.getModel(nDid);
@@ -59,6 +64,7 @@ public class CsDeviceController extends BaseController {
@PostMapping("/access")
@ApiOperation("直连设备注册")
@ApiImplicitParam(name = "devAccessParam", value = "接入参数", required = true)
@DeviceLog(operateType = DeviceOperate.DEVICE_REGISTER)
public HttpResult<String> devAccess(@RequestBody @Validated DevAccessParam devAccessParam){
String methodDescribe = getMethodDescribe("getModel");
csDeviceService.devAccess(devAccessParam);

View File

@@ -3,7 +3,6 @@ package com.njcn.access.handler;
import com.alibaba.excel.util.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
import com.github.tocrhz.mqtt.annotation.NamedValue;
import com.github.tocrhz.mqtt.annotation.Payload;
@@ -13,17 +12,16 @@ import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.RspDataDto;
import com.njcn.access.pojo.dto.*;
import com.njcn.access.pojo.dto.devModel.LDevInfoDto;
import com.njcn.access.pojo.dto.heart.HeartBeatDto;
import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.pojo.po.CsLineModel;
import com.njcn.access.pojo.po.CsTopic;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsLineModelService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.DevModelFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsDevModelPO;
import com.njcn.redis.utils.RedisUtil;
import lombok.AllArgsConstructor;
@@ -50,8 +48,6 @@ import java.util.stream.Collectors;
@AllArgsConstructor
public class MqttMessageHandler {
private final EquipmentFeignClient equipmentFeignClient;
private final DevModelFeignClient devModelFeignClient;
private final ICsLineModelService csLineModelService;
@@ -62,6 +58,8 @@ public class MqttMessageHandler {
private final RedisUtil redisUtil;
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Autowired
Validator validator;
@@ -200,13 +198,12 @@ public class MqttMessageHandler {
Gson gson = new Gson();
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
switch (res.getType()){
case 8453:
case 4613:
log.info("收到接入应答响应--->" + nDid);
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
//修改装置状态
equipmentFeignClient.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode());
//装置接入之后再设置心跳时间,超时改为掉线
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L);
csEquipmentDeliveryService.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode());
csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode());
} else {
log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage());
}
@@ -242,33 +239,48 @@ public class MqttMessageHandler {
* @param nDid
* @param payload
*/
@MqttSubscribe(value = "/Dev/PfmCmd/{version}/{edgeId}",qos = 1)
@MqttSubscribe(value = "/Dev/Data/{version}/{edgeId}",qos = 1)
@Transactional(rollbackFor = Exception.class)
public void devHeartBeat(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) {
//解析数据
Gson gson = new Gson();
ReqAndResDto.Req res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Req.class);
//响应请求
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
switch (res.getType()){
case 4865:
//设置心跳时间,超时改为掉线
redisUtil.saveByKeyWithExpire("MQTT:" + nDid, Instant.now().toEpochMilli(),180L);
//处理心跳
ReqAndResDto.Res reqAndResParam = new ReqAndResDto.Res();
HeartBeatDto heartBeatDto = new HeartBeatDto();
heartBeatDto.setTime(System.currentTimeMillis()/1000);
reqAndResParam.setMid(1);
reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_14.getCode()));
reqAndResParam.setExpire(-1);
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_29.getCode()));
reqAndResParam.setCode(200);
reqAndResParam.setMsg(heartBeatDto);
publisher.send("/Dev/PfmRsp/"+version+"/"+nDid,PubUtils.obj2json(reqAndResParam),1,false);
publisher.send("/Dev/DataRsp/"+version+"/"+nDid,PubUtils.obj2json(reqAndResParam),1,false);
//处理业务逻辑
ReqAndResDto.Res res = PubUtils.json2obj(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
Object object = res.getMsg();
if (!Objects.isNull(object)){
List<String> abnormalList = new ArrayList<>();
if (object instanceof ArrayList<?>){
abnormalList.addAll((List<String>) object);
}
//todo 需要处理异常异常设备
//todo APF设备不存在逻辑设备掉线的情况网关下的设备会存在
abnormalList.forEach(item->{
System.out.println("异常设备ID"+item);
});
}
break;
case 4866:
//处理主动上送数据
break;
default:
break;
}
}
}

View File

@@ -1,8 +1,18 @@
package com.njcn.access.listener;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.enums.DeviceOperate;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.web.advice.DeviceLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
@@ -12,6 +22,11 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author hongawen
@@ -23,10 +38,13 @@ import javax.annotation.Resource;
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Resource
private RedisUtil redisUtil;
private ICsTopicService csTopicService;
@Resource
private MqttPublisher publisher;
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Resource
private CsDeviceServiceImpl csDeviceService;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
@@ -38,7 +56,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
* 注意message.toString()可以获取失效的key
*/
@Override
@Transactional(rollbackFor = {Exception.class})
@DeviceLog(operateType = DeviceOperate.DEVICE_OFFLINE)
public void onMessage(Message message, byte[] pattern) {
if (StringUtils.isBlank(message.toString())) {
return;
@@ -46,21 +64,29 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
//判断失效的key是否为MQTT消费端存入的
String expiredKey = message.toString();
if(expiredKey.startsWith("MQTT:")){
Gson gson = new Gson();
String ndid = expiredKey.split(":")[1];
//网关下线
// netDevService.offlineNetDev(ndid);
String result = (String) redisUtil.getObjectByKey(ndid);
String nDid = expiredKey.split(":")[1];
//装置下线
// List<String> list = modelService.monitorHeartbeat(ndid,result);
//生成通知报文
// DeviceOperateDTO deviceOperateDTO = new DeviceOperateDTO();
// deviceOperateDTO.setNdid(ndid);
// deviceOperateDTO.setDid(list);
// deviceOperateDTO.setTime(Long.toString(System.currentTimeMillis()/1000));
// deviceOperateDTO.setMessage("装置下线");
// deviceOperateDTO.setType("005");
// publisher.send("/device/platform",gson.toJson(deviceOperateDTO),1,false);
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
//立马发起接入请求
String version = csTopicService.getVersion(nDid);
csDeviceService.devAccess(nDid,version);
//接入再次失败,则定时发起接入请求
try {
Thread.sleep(1000);
Integer status = csEquipmentDeliveryService.queryEquipmentByndid(nDid).getRunStatus();
if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
ScheduledFuture<?> runnableFuture = executor.scheduleAtFixedRate(() -> {
csDeviceService.devAccess(nDid, version);
Integer status2 = csEquipmentDeliveryService.queryEquipmentByndid(nDid).getRunStatus();
if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){
throw new BusinessException(CommonResponseEnum.SUCCESS);
}
}, 1, 3600, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

View File

@@ -0,0 +1,17 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/3/27 10:18【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface AppLineTopologyDiagramMapper extends BaseMapper<AppLineTopologyDiagramPO> {
}

View File

@@ -0,0 +1,17 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.csdevice.pojo.po.CsDevModelRelationPO;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/4/18 13:49【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface CsDevModelRelationMapper extends BaseMapper<CsDevModelRelationPO> {
}

View File

@@ -0,0 +1,15 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.csdevice.pojo.po.CsDeviceUserPO;
/**
*
* Description:
* Date: 2023/6/27 9:40【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface CsDeviceUserMapper extends BaseMapper<CsDeviceUserPO> {
}

View File

@@ -0,0 +1,17 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/3/30 16:23【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface CsEquipmentDeliveryMapper extends BaseMapper<CsEquipmentDeliveryPO> {
}

View File

@@ -0,0 +1,21 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.csdevice.pojo.dto.LineParamDTO;
import com.njcn.csdevice.pojo.po.CsLedger;
import com.njcn.csdevice.pojo.vo.CsLedgerVO;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* <p>
* 台账表 Mapper 接口
* </p>
*
* @author xuyang
* @since 2023-05-31
*/
public interface CsLedgerMapper extends BaseMapper<CsLedger> {
}

View File

@@ -0,0 +1,15 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.csdevice.pojo.po.CsLinePO;
/**
*
* Description:
* Date: 2023/5/18 14:01【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface CsLineMapper extends BaseMapper<CsLinePO> {
}

View File

@@ -0,0 +1,17 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/3/27 10:18【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface IAppLineTopologyDiagramService extends IService<AppLineTopologyDiagramPO> {
}

View File

@@ -0,0 +1,31 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csdevice.pojo.param.CsDevModelRelationAddParm;
import com.njcn.csdevice.pojo.param.CsDevModelRelationAuidtParm;
import com.njcn.csdevice.pojo.param.CsDevModelRelationQueryParm;
import com.njcn.csdevice.pojo.po.CsDevModelRelationPO;
import com.njcn.csdevice.pojo.vo.CsDevModelRelationVO;
import java.util.List;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/4/18 13:49【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface ICsDevModelRelationService extends IService<CsDevModelRelationPO>{
/**
* 新增装置和模板的关系表
* @param addParm
* @return
*/
CsDevModelRelationPO addDevModelRelation(CsDevModelRelationAddParm addParm);
}

View File

@@ -0,0 +1,16 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csdevice.pojo.po.CsDeviceUserPO;
/**
*
* Description:
* Date: 2023/6/27 9:40【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface ICsDeviceUserService extends IService<CsDeviceUserPO>{
}

View File

@@ -0,0 +1,50 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryAddParm;
import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryAuditParm;
import com.njcn.csdevice.pojo.param.CsEquipmentDeliveryQueryParm;
import com.njcn.csdevice.pojo.param.ProjectEquipmentQueryParm;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.csdevice.pojo.vo.DeviceManagerVO;
import com.njcn.csdevice.pojo.vo.ProjectEquipmentVO;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/3/30 16:23【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface ICsEquipmentDeliveryService extends IService<CsEquipmentDeliveryPO>{
/**
* 根据网关id修改装置的状态
* @param nDid 网关id
*/
void updateStatusBynDid(String nDid,Integer status);
/**
* 根据网关id修改软件信息
* @param nDid 网关id
*/
void updateSoftInfoBynDid(String nDid,String id);
/**
* 根据网关id修改设备运行状态
* @param nDid 网关id
*/
void updateRunStatusBynDid(String nDid,Integer id);
/**
* 根据ndid查询装置信息
* @param ndid
* @return
*/
CsEquipmentDeliveryVO queryEquipmentByndid(String ndid);
}

View File

@@ -0,0 +1,27 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csdevice.pojo.dto.LineParamDTO;
import com.njcn.csdevice.pojo.param.CsLedgerParam;
import com.njcn.csdevice.pojo.po.CsLedger;
import com.njcn.csdevice.pojo.vo.CsLedgerVO;
import java.util.List;
/**
* <p>
* 台账表 服务类
* </p>
*
* @author xuyang
* @since 2023-05-31
*/
public interface ICsLedgerService extends IService<CsLedger> {
/**
* 新增台账数据
* @param csLedgerParam
*/
CsLedger addLedgerTree(CsLedgerParam csLedgerParam);
}

View File

@@ -0,0 +1,16 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.csdevice.pojo.po.CsLinePO;
/**
*
* Description:
* Date: 2023/5/18 14:01【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface ICsLineService extends IService<CsLinePO>{
}

View File

@@ -0,0 +1,24 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.AppLineTopologyDiagramMapper;
import com.njcn.access.service.IAppLineTopologyDiagramService;
import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/3/27 10:18【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class AppLineTopologyDiagramServiceImpl extends ServiceImpl<AppLineTopologyDiagramMapper, AppLineTopologyDiagramPO> implements IAppLineTopologyDiagramService {
}

View File

@@ -0,0 +1,66 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsDevModelRelationMapper;
import com.njcn.access.service.ICsDevModelRelationService;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csdevice.pojo.param.CsDevModelRelationAddParm;
import com.njcn.csdevice.pojo.param.CsDevModelRelationQueryParm;
import com.njcn.csdevice.pojo.po.CsDevModelRelationPO;
import com.njcn.csdevice.pojo.vo.CsDevModelRelationVO;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.stream.Collectors;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/4/18 13:49【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
public class CsDevModelRelationServiceImpl extends ServiceImpl<CsDevModelRelationMapper, CsDevModelRelationPO> implements ICsDevModelRelationService {
@Override
@Transactional(rollbackFor = Exception.class)
public CsDevModelRelationPO addDevModelRelation(CsDevModelRelationAddParm addParm) {
CsDevModelRelationQueryParm queryParm = new CsDevModelRelationQueryParm();
queryParm.setDevId (addParm.getDevId ());
queryParm.setModelId (addParm.getModelId ());
queryParm.setStatus ("1");
List<CsDevModelRelationVO> csDevModelRelationVOS = this.queryDevModelRelation (queryParm);
if(csDevModelRelationVOS.size ()>0){
throw new BusinessException (AlgorithmResponseEnum.DATA_ERROR);
}
CsDevModelRelationPO csDevModelRelationPO = new CsDevModelRelationPO();
BeanUtils.copyProperties (addParm, csDevModelRelationPO);
csDevModelRelationPO.setStatus ("1");
this.save (csDevModelRelationPO);
return csDevModelRelationPO;
}
public List<CsDevModelRelationVO> queryDevModelRelation(CsDevModelRelationQueryParm queryParm) {
QueryWrapper<CsDevModelRelationPO> queryWrapper = new QueryWrapper<> ();
queryWrapper.eq (StringUtils.isNotBlank (queryParm.getId ()),"id",queryParm.getId ()).
eq (StringUtils.isNotBlank (queryParm.getModelId ()),"model_id",queryParm.getModelId ()).
eq (StringUtils.isNotBlank (queryParm.getDevId ()),"dev_id",queryParm.getDevId ()).
eq (StringUtils.isNotBlank (queryParm.getStatus ()),"status",queryParm.getStatus ());
List<CsDevModelRelationPO> csDevModelRelationPOS = this.getBaseMapper ( ).selectList (queryWrapper);
List<CsDevModelRelationVO> collect = csDevModelRelationPOS.stream ( ).map (temp -> {
CsDevModelRelationVO vo = new CsDevModelRelationVO ( );
BeanUtils.copyProperties (temp, vo);
return vo;
}).collect (Collectors.toList ( ));
return collect;
}
}

View File

@@ -378,8 +378,13 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setPhase(epd.getPhase());
}
eleEpdPqdParam.setUnit(epd.getUnit());
eleEpdPqdParam.setHarmStart(epd.getHarmStart());
eleEpdPqdParam.setHarmEnd(epd.getHarmEnd());
if (Objects.equals(epd.getHarmStart(),0.5) && Objects.equals(epd.getHarmEnd(),49.5)){
eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(epd.getHarmStart()+49.5));
} else {
eleEpdPqdParam.setHarmStart((int)(epd.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(epd.getHarmStart()*1.0));
}
eleEpdPqdParam.setStatMethod(epd.getStatMethod());
eleEpdPqdParam.setDataType(id);
eleEpdPqdParam.setClassId(classId);
@@ -407,8 +412,13 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
eleEpdPqdParam.setPhase(pqd.getPhase());
}
eleEpdPqdParam.setUnit(pqd.getUnit());
eleEpdPqdParam.setHarmStart(pqd.getHarmStart());
eleEpdPqdParam.setHarmEnd(pqd.getHarmEnd());
if (Objects.equals(pqd.getHarmStart(),0.5) && Objects.equals(pqd.getHarmEnd(),49.5)){
eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()+0.5));
eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmStart()+49.5));
} else {
eleEpdPqdParam.setHarmStart((int)(pqd.getHarmStart()*1.0));
eleEpdPqdParam.setHarmEnd((int)(pqd.getHarmStart()*1.0));
}
eleEpdPqdParam.setDataType(id);
eleEpdPqdParam.setClassId(classId);
result.add(eleEpdPqdParam);
@@ -763,6 +773,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
}
EleEpdPqd eleEpdPqd = epdFeignClient.findByParam(name,id,phase).getData();
if (Objects.isNull(eleEpdPqd)){
log.info("指标名称:"+name+",数据类型:"+id+",相别:"+phase);
throw new BusinessException(AccessResponseEnum.DICT_MISSING);
}
// M 代表没有数据,因为influxDB要录入数据此字段是主键给个默认值

View File

@@ -14,9 +14,7 @@ import com.njcn.access.pojo.dto.AskDataDto;
import com.njcn.access.pojo.dto.CsModelDto;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.po.CsSoftInfoPO;
import com.njcn.access.service.ICsDeviceService;
import com.njcn.access.service.ICsSoftInfoService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.*;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
@@ -41,6 +39,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -61,17 +60,19 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
private final EquipmentFeignClient equipmentFeignClient;
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
private final DictTreeFeignClient dictTreeFeignClient;
private final CsLedgerFeignClient csLedgerFeignClient;
private final ICsLedgerService csLedgerService;
private final CsLineFeignClient csLineFeignClient;
private final ICsDevModelRelationService csDevModelRelationService;
private final CsLineTopologyFeignClient csLineTopologyFeignClient;
private final ICsLineService csLineService;
private final DevModelRelationFeignClient devModelRelationFeignClient;
private final IAppLineTopologyDiagramService appLineTopologyDiagramService;
private final CsDeviceUserFeignClient csDeviceUserFeignClient;
private final ICsDeviceUserService csDeviceUserService;
private final MqttPublisher publisher;
@@ -125,7 +126,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
public Object getModel(String nDid) {
Object model = null;
try {
Thread.sleep(1000);
Thread.sleep(1500);
String key = "LINE" + nDid;
model = redisUtil.getObjectByKey(key);
if (Objects.isNull(model)){
@@ -152,7 +153,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csLedgerParam.setName(vo.getName());
csLedgerParam.setLevel(2);
csLedgerParam.setSort(0);
csLedgerFeignClient.add(csLedgerParam);
csLedgerService.addLedgerTree(csLedgerParam);
List<CsModelDto> modelId = objectToList(redisUtil.getObjectByKey("MODEL" + devAccessParam.getNDid()));
Integer clDid = null;
//2.新增装置-模板关系、获取电能质量的逻辑设备id
@@ -161,7 +162,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csDevModelRelationAddParm.setDevId(vo.getId());
csDevModelRelationAddParm.setModelId(item.getModelId());
csDevModelRelationAddParm.setDid(item.getDid());
devModelRelationFeignClient.addDevModelRelation(csDevModelRelationAddParm);
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
if (Objects.equals(item.getType(),1)){
clDid = item.getDid();
}
@@ -171,16 +172,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
askDevData(devAccessParam.getNDid(),AccessEnum.L_DEV_INFO.getCode(),version,clDid);
List<RspDataDto.LdevInfo> list = new ArrayList<>();
try {
//等待mqtt数据
Thread.sleep(500);
String key = "LINEDATA" + devAccessParam.getNDid();
list = objectToList2(redisUtil.getObjectByKey(key));
if (CollectionUtils.isEmpty(list)){
throw new BusinessException(AccessResponseEnum.LDEVINFO_IS_NULL);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
//3.监测点表录入关系
for (DevAccessParam.LineParam item : devAccessParam.getList()) {
String location = dicDataFeignClient.getDicDataById(item.getPosition()).getData().getCode();
@@ -210,7 +208,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
param.setName(item.getName());
param.setLevel(3);
param.setSort(0);
csLedgerFeignClient.add(param).getData();
csLedgerService.addLedgerTree(param);
AppLineTopologyDiagramPO appLineTopologyDiagramPo = new AppLineTopologyDiagramPO();
appLineTopologyDiagramPo.setId(devAccessParam.getTopologyDiagram());
appLineTopologyDiagramPo.setLineId(id);
@@ -219,47 +217,41 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
appLineTopologyDiagramPo.setStatus("1");
appLineTopologyDiagramPoList.add(appLineTopologyDiagramPo);
}
csLineFeignClient.addLineList(csLinePoList);
csLineService.saveBatch(csLinePoList);
//4.监测点拓扑图表录入关系
csLineTopologyFeignClient.addList(appLineTopologyDiagramPoList);
//5.修改装置状态
equipmentFeignClient.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode());
//6.绑定装置和人的关系
appLineTopologyDiagramService.saveBatch(appLineTopologyDiagramPoList);
//5.绑定装置和人的关系
CsDeviceUserPO po = new CsDeviceUserPO();
po.setPrimaryUserId(RequestUtil.getUserIndex());
po.setStatus("1");
po.setSubUserId(RequestUtil.getUserIndex());
po.setDeviceId(vo.getId());
csDeviceUserFeignClient.add(Collections.singletonList(po));
//todo 录入软件信息 SoftInfo
csDeviceUserService.saveBatch(Collections.singletonList(po));
//6.录入软件信息 SoftInfo
askDevData(devAccessParam.getNDid(),AccessEnum.SOFT_INFO.getCode(),version,0);
try {
//等待mqtt数据
Thread.sleep(500);
String key = "SOFTINFO" + devAccessParam.getNDid();
RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(redisUtil.getObjectByKey(key)), RspDataDto.SoftInfo.class);
String key2 = "SOFTINFO" + devAccessParam.getNDid();
RspDataDto.SoftInfo softInfo = JSON.parseObject(JSON.toJSONString(redisUtil.getObjectByKey(key2)), RspDataDto.SoftInfo.class);
if (Objects.isNull(softInfo)){
throw new BusinessException(AccessResponseEnum.SOFTINFO_IS_NULL);
}
//记录设备软件信息
CsSoftInfoPO csSoftInfoPo = new CsSoftInfoPO();
BeanUtils.copyProperties(softInfo,csSoftInfoPo);
csSoftInfoPo.setAppDate(new SimpleDateFormat("yyyy-MM-dd").parse(softInfo.getAppDate()));
csSoftInfoService.save(csSoftInfoPo);
//更新设备表软件信息
equipmentFeignClient.updateSoftInfoBynDid(devAccessParam.getNDid(),csSoftInfoPo.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
//todo 9.记录注册日志
//删除redis监测点模板信息
csEquipmentDeliveryService.updateSoftInfoBynDid(devAccessParam.getNDid(),csSoftInfoPo.getId());
//修改装置状态
csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode());
//7.发起自动接入请求
devAccess(devAccessParam.getNDid(),version);
//8.删除redis监测点模板信息
redisUtil.delete("MODEL" + devAccessParam.getNDid());
redisUtil.delete("LINE" + devAccessParam.getNDid());
redisUtil.delete("LINEDATA" + devAccessParam.getNDid());
redisUtil.delete("SOFTINFO" + devAccessParam.getNDid());
//发起自动接入请求
//devAccess(devAccessParam.getNDid(),version);
//todo 10.记录接入日志
} catch (Exception e) {
throw new BusinessException(CommonResponseEnum.FAIL);
}

View File

@@ -0,0 +1,22 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsDeviceUserMapper;
import com.njcn.access.service.ICsDeviceUserService;
import com.njcn.csdevice.pojo.po.CsDeviceUserPO;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
*
* Description:
* Date: 2023/6/27 9:40【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class CsDeviceUserServiceImpl extends ServiceImpl<CsDeviceUserMapper, CsDeviceUserPO> implements ICsDeviceUserService {
}

View File

@@ -0,0 +1,61 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsEquipmentDeliveryMapper;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
*
* Description:
* 接口文档访问地址http://serverIP:port/swagger-ui.html
* Date: 2023/3/30 16:23【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliveryMapper, CsEquipmentDeliveryPO> implements ICsEquipmentDeliveryService {
@Override
public void updateStatusBynDid(String nDId,Integer status) {
LambdaUpdateWrapper<CsEquipmentDeliveryPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getStatus,status).eq(CsEquipmentDeliveryPO::getNdid,nDId);
this.update(lambdaUpdateWrapper);
}
@Override
public void updateSoftInfoBynDid(String nDid, String id) {
LambdaUpdateWrapper<CsEquipmentDeliveryPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getSoftinfoId,id).eq(CsEquipmentDeliveryPO::getNdid,nDid);
this.update(lambdaUpdateWrapper);
}
@Override
public void updateRunStatusBynDid(String nDid, Integer id) {
LambdaUpdateWrapper<CsEquipmentDeliveryPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getRunStatus,id).eq(CsEquipmentDeliveryPO::getNdid,nDid);
this.update(lambdaUpdateWrapper);
}
@Override
public CsEquipmentDeliveryVO queryEquipmentByndid(String ndid) {
CsEquipmentDeliveryVO result = new CsEquipmentDeliveryVO();
CsEquipmentDeliveryPO csEquipmentDeliveryPo = lambdaQuery().eq(CsEquipmentDeliveryPO::getNdid,ndid).ne(CsEquipmentDeliveryPO::getRunStatus,0).one();
if(Objects.isNull (csEquipmentDeliveryPo)){
return result;
}
BeanUtils.copyProperties(csEquipmentDeliveryPo,result);
return result;
}
}

View File

@@ -0,0 +1,42 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsLedgerMapper;
import com.njcn.access.service.ICsLedgerService;
import com.njcn.csdevice.pojo.param.CsLedgerParam;
import com.njcn.csdevice.pojo.po.CsLedger;
import lombok.AllArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
* <p>
* 台账表 服务实现类
* </p>
*
* @author xuyang
* @since 2023-05-31
*/
@Service
@AllArgsConstructor
public class CsLedgerServiceImpl extends ServiceImpl<CsLedgerMapper, CsLedger> implements ICsLedgerService {
@Override
public CsLedger addLedgerTree(CsLedgerParam csLedgerParam) {
CsLedger fatherCsLedger = this.lambdaQuery().eq(CsLedger::getId,csLedgerParam.getPid()).one();
CsLedger csLedger = new CsLedger();
BeanUtils.copyProperties(csLedgerParam,csLedger);
csLedger.setState(1);
if (Objects.equals(csLedgerParam.getPid(),"9999999")){
csLedger.setPid("0");
csLedger.setPids("0");
} else {
csLedger.setPids(fatherCsLedger.getPids() + "," + csLedgerParam.getPid());
}
this.save(csLedger);
return csLedger;
}
}

View File

@@ -0,0 +1,22 @@
package com.njcn.access.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsLineMapper;
import com.njcn.access.service.ICsLineService;
import com.njcn.csdevice.pojo.po.CsLinePO;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
*
* Description:
* Date: 2023/5/18 14:01【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class CsLineServiceImpl extends ServiceImpl<CsLineMapper, CsLinePO> implements ICsLineService {
}