代码提交

This commit is contained in:
2023-05-12 15:42:33 +08:00
parent 4c29d3869f
commit 78b4513880
72 changed files with 1535 additions and 792 deletions

View File

@@ -43,7 +43,4 @@ public class AccessController extends BaseController {
accessService.add(nDid);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,54 @@
package com.njcn.access.controller;
import com.njcn.access.pojo.MessageParam;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.validation.Valid;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/4/18 13:53
*/
@Slf4j
@RestController
@RequestMapping("/test")
@Api(value = "TestController", tags = {"测试 API"})
public class KafkaController extends BaseController {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/kafka/sendMessage")
@ApiOperation(value = "发送kafka告警消息")
public void sendKafkaMessage(@Valid @ApiParam("参数") @RequestBody MessageParam param) {
kafkaTemplate.send(param.getTopic(), param.getMessage());
}
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic2"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
}

View File

@@ -0,0 +1,48 @@
package com.njcn.access.controller;
import com.njcn.access.service.ICsTopicService;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.common.utils.LogUtil;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/5/12 10:28
*/
@Slf4j
@RestController
@RequestMapping("/topic")
@RequiredArgsConstructor
@Api(tags = "装置主题")
public class TopicController extends BaseController {
private final ICsTopicService csTopicService;
@PostMapping("/ask")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("平台询问装置支持主题")
@ApiImplicitParam(name = "nDid", value = "网关识别码", required = true)
public HttpResult<Object> ask(@RequestParam String nDid){
String methodDescribe = getMethodDescribe("ask");
LogUtil.njcnDebug(log, "{}平台询问装置支持主题请求的nDid为{}", methodDescribe, nDid);
csTopicService.askDevTopic(nDid);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -5,26 +5,39 @@ import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
import com.github.tocrhz.mqtt.annotation.NamedValue;
import com.github.tocrhz.mqtt.annotation.Payload;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessStatusEnum;
import com.njcn.access.pojo.dto.*;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.DevModInfoDto;
import com.njcn.access.pojo.dto.ModelDto;
import com.njcn.access.pojo.dto.TopicDto;
import com.njcn.access.pojo.dto.heart.HeartBeatDto;
import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.pojo.po.CsTopicPO;
import com.njcn.access.service.ICsTopicService;
import com.njcn.algorithm.api.DevModelFeignClient;
import com.njcn.algorithm.api.EquipmentFeignClient;
import com.njcn.algorithm.pojo.param.CsDevModelQueryListParm;
import com.njcn.algorithm.pojo.vo.CsDevModelPageVO;
import com.njcn.algorithm.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.PubUtils;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.*;
/**
* @author hongawen
@@ -46,35 +59,82 @@ public class MqttMessageHandler {
private final RedisUtil redisUtil;
private final ICsTopicService csTopicService;
@Autowired
Validator validator;
@MqttSubscribe(value = "/Dev/Topic/{edgeId}",qos = 1)
@Transactional(rollbackFor = Exception.class)
public void devTopic(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
Gson gson = new Gson();
ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
//检验传递的参数是否准确
Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
validate.forEach(constraintViolation -> {
System.out.println(constraintViolation.getMessage());
});
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
if (Objects.equals(res.getType(), TypeEnum.TYPE_1.getCode())){
List<CsTopicPO> list = new ArrayList<>();
//fixme 这边获取数据需要调整
Map<String,List<String>> map = (Map<String,List<String>>)res.getMsg();
List<String> topicList = map.get("Topic");
topicList.forEach(item->{
CsTopicPO csTopicPo = new CsTopicPO();
csTopicPo.setNdid(nDid);
csTopicPo.setTopic(item);
csTopicPo.setType(0);
list.add(csTopicPo);
});
csTopicService.addList(list);
} else {
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
}
} else {
log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
}
}
/**
* 接收装置接入响应
* 装置注册应答
* 1.收到注册信息,修改装置出厂表,装置的状态,调整为注册;然后开始接入流程
* 2.询问当前装置类型的模板。有则完成接入;没有则告警出来,需要人工手动上传模板信息
* @param topic
* @param message
* @param payload
*/
@MqttSubscribe(value = "/device/register/{nDid}",qos = 1)
public void devOperation(String topic, MqttMessage message, @NamedValue("nDid") String nDid, @Payload String payload){
@MqttSubscribe(value = "/Dev/Reg/{edgeId}",qos = 1)
@Transactional(rollbackFor = Exception.class)
public void devOperation(String topic, MqttMessage message, @NamedValue("edgeId") String nDid, @Payload String payload){
Gson gson = new Gson();
RegisterDTO.RegisterResponse registerDTO = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), RegisterDTO.RegisterResponse.class);
if (registerDTO.getCode() == 200){
equipmentFeignClient.updateStatusBynDid(nDid, AccessStatusEnum.REGISTERED.getCode());
PublicDto publicDto = new PublicDto();
publicDto.setMid(Long.toString(Instant.now().toEpochMilli()));
publicDto.setNDid(nDid);
publicDto.setTimestamp(Instant.now().toEpochMilli());
publicDto.setType("CMD_DEV_DATA");
AccessDto accessDto = new AccessDto();
accessDto.setNDid(nDid);
accessDto.setDevType(registerDTO.getParam().getDev_type());
publicDto.setParam(accessDto);
publisher.send("/platform/devcmd/"+nDid,new Gson().toJson(publicDto),1,false);
ReqAndResParam.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
//检验传递的参数是否准确
Set<ConstraintViolation<ReqAndResParam.Res>> validate = validator.validate(res);
validate.forEach(constraintViolation -> {
System.out.println(constraintViolation.getMessage());
});
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
if (Objects.equals(res.getType(),TypeEnum.TYPE_17.getCode())){
equipmentFeignClient.updateStatusBynDid(nDid, AccessEnum.REGISTERED.getCode());
//询问模板数据
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
reqAndResParam.setMid(1);
reqAndResParam.setDid("0");
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(TypeEnum.TYPE_3.getCode());
reqAndResParam.setExpire(-1);
publisher.send("/Pfm/DevCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false);
} else {
log.info(AccessResponseEnum.MESSAGE_TYPE_ERROR.getMessage());
}
} else {
log.info(AccessResponseEnum.RESPONSE_ERROR.getMessage());
}
}
/**
* 装置类型模板
* 装置类型模板应
* 1.判断网关的类型
* 2.直联设备的DevCfg和DevMod是以直联设备为准上送平台端平台端保存。通过校验DevMod模板信息来从平台端模板池中选取对应的模板如果找不到匹配模板需告警提示人工干预处理。
* 3.平台端需读取装置的DevMod来判断网关支持的设备模板包含设备型号和模板版本根据app提交的接入子设备DID匹配数据模板型号及版本生成DevCfg下发给网关网关根据下发信息生成就地设备点表。
@@ -83,42 +143,92 @@ public class MqttMessageHandler {
* @param nDid
* @param payload
*/
@MqttSubscribe(value = "/device/devack/{nDid}",qos = 1)
public void devModelOperation(String topic, MqttMessage message, @NamedValue("nDid") String nDid, @Payload String payload){
@MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1)
@Transactional(rollbackFor = Exception.class)
public void devModelOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){
Gson gson = new Gson();
ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class);
HttpResult<CsEquipmentDeliveryVO> pojo = equipmentFeignClient.queryEquipmentByndid(nDid);
if (!Objects.isNull(pojo)){
String devType = pojo.getData().getDevType();
if (Objects.equals(devType,"直连设备")){
List<DevModelDto> list = modelDto.getDevMod();
String dictCode = dicDataFeignClient.getDicDataById(devType).getData().getCode();
//直连设备处理
if (Objects.equals(dictCode, DicDataEnum.CONNECT_DEV.getCode())){
List<DevModInfoDto> list = modelDto.getDevMod();
list.forEach(item->{
DictData dicData = dicDataFeignClient.getDicDataByCode(item.getDevType()).getData();
CsDevModelQueryListParm csDevModelQueryListParm = new CsDevModelQueryListParm();
if (Objects.isNull(dicData)) {
log.info("新增模板失败,获取装置类型字典数据为空,请先录入装置类型!");
log.info(AccessResponseEnum.DEV_TYPE_NOT_FIND.getMessage());
return;
} else {
csDevModelQueryListParm.setDevType( dicData.getId());
csDevModelQueryListParm.setDevType(dicData.getId());
}
csDevModelQueryListParm.setVersionNo(item.getVersionNo());
csDevModelQueryListParm.setVersionDate(item.getVersionDate());
CsDevModelPageVO csDevModelPageVO = devModelFeignClient.queryDevModelOne(csDevModelQueryListParm).getData();
if (Objects.isNull(csDevModelPageVO)){
log.info("模板不存在,请先录入模板数据!");
log.info(AccessResponseEnum.MODEL_NO_FIND.getMessage());
} else {
equipmentFeignClient.updateStatusBynDid(nDid, AccessStatusEnum.ACCESS.getCode());
//todo 录入装置和模板的关系表
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
reqAndResParam.setMid(1);
reqAndResParam.setDid("0");
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(TypeEnum.TYPE_5.getCode());
reqAndResParam.setExpire(-1);
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
//将装置状态改为接入
equipmentFeignClient.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode());
//设置心跳时间,超时改为掉线
redisUtil.saveByKeyWithExpire("MQTT:" + nDid,Instant.now().toEpochMilli(),180L);
}
});
} else if (Objects.equals(devType,"网关")){
}
//网关处理 生成报文下发给装置,装置响应版本
else if (Objects.equals(dictCode, DicDataEnum.GATEWAY_DEV.getCode())){
//todo 处理待定
System.out.println("网关设备判断");
}
} else {
log.info("通过nDid未找到相关装置信息!");
log.info(AccessResponseEnum.DEV_NO_FIND.getMessage());
}
}
/**
* 装置心跳
* @param topic
* @param message
* @param version
* @param nDid
* @param payload
*/
@MqttSubscribe(value = "/Dev/PfmCmd/{version}/{edgeId}",qos = 1)
@Transactional(rollbackFor = Exception.class)
public void devHeartBeat(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) {
//响应请求
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
HeartBeatDto heartBeatDto = new HeartBeatDto();
heartBeatDto.setTime(System.currentTimeMillis()/1000);
reqAndResParam.setMid(1);
reqAndResParam.setDid("0");
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(TypeEnum.TYPE_14.getCode());
reqAndResParam.setExpire(-1);
reqAndResParam.setMsg(heartBeatDto);
publisher.send("/Dev/PfmRsp/"+version+"/"+nDid,PubUtils.obj2json(reqAndResParam),1,false);
//处理业务逻辑
ReqAndResParam.Res res = PubUtils.json2obj(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResParam.Res.class);
Object object = res.getMsg();
if (!Objects.isNull(object)){
List<String> abnormalList = new ArrayList<>();
if (object instanceof ArrayList<?>){
abnormalList.addAll((List<String>) object);
}
//todo 需要处理异常异常设备
abnormalList.forEach(item->{
System.out.println("异常设备ID"+item);
});
}
}

View File

@@ -0,0 +1,66 @@
package com.njcn.access.listener;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.redis.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author hongawen
* @version 1.0.0
* @date 2022年04月02日 14:31
*/
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Resource
private RedisUtil redisUtil;
@Resource
private MqttPublisher publisher;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 针对redis数据失效事件进行数据处理
* 注意message.toString()可以获取失效的key
*/
@Override
@Transactional(rollbackFor = {Exception.class})
public void onMessage(Message message, byte[] pattern) {
if (StringUtils.isBlank(message.toString())) {
return;
}
//判断失效的key是否为MQTT消费端存入的
String expiredKey = message.toString();
if(expiredKey.startsWith("MQTT:")){
Gson gson = new Gson();
String ndid = expiredKey.split(":")[1];
//网关下线
// netDevService.offlineNetDev(ndid);
String result = (String) redisUtil.getObjectByKey(ndid);
//装置下线
// List<String> list = modelService.monitorHeartbeat(ndid,result);
//生成通知报文
// DeviceOperateDTO deviceOperateDTO = new DeviceOperateDTO();
// deviceOperateDTO.setNdid(ndid);
// deviceOperateDTO.setDid(list);
// deviceOperateDTO.setTime(Long.toString(System.currentTimeMillis()/1000));
// deviceOperateDTO.setMessage("装置下线");
// deviceOperateDTO.setType("005");
// publisher.send("/device/platform",gson.toJson(deviceOperateDTO),1,false);
}
}
}

View File

@@ -0,0 +1,16 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.access.pojo.po.CsBmdPO;
/**
* <p>
* Mapper 接口
* </p>
*
* @author xuyang
* @since 2023-05-11
*/
public interface CsBmdMapper extends BaseMapper<CsBmdPO> {
}

View File

@@ -0,0 +1,16 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.access.pojo.po.CsEpdPqdPO;
/**
* <p>
* Mapper 接口
* </p>
*
* @author xuyang
* @since 2023-05-11
*/
public interface CsEpdPqdMapper extends BaseMapper<CsEpdPqdPO> {
}

View File

@@ -0,0 +1,18 @@
package com.njcn.access.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.access.pojo.po.CsTopicPO;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author xuyang
* @since 2023-05-12
*/
@Mapper
public interface CsTopicMapper extends BaseMapper<CsTopicPO> {
}

View File

@@ -1,17 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.njcn.quality.mapper.DataGroupMapper">
<select id="getGroupDataList" resultType="DataGroupTemplateVO">
select id,pid,name,sort,1 as level from ele_data_group
where pid = #{id}
order by sort
</select>
<select id="getGroupList" resultType="DataGroupTemplateVO">
select id,pid,name,sort from ele_data_group
where pid = #{id}
order by sort
</select>
</mapper>

View File

@@ -0,0 +1,16 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.access.pojo.po.CsBmdPO;
/**
* <p>
* 服务类
* </p>
*
* @author xuyang
* @since 2023-05-11
*/
public interface ICsBmdService extends IService<CsBmdPO> {
}

View File

@@ -0,0 +1,16 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.access.pojo.po.CsEpdPqdPO;
/**
* <p>
* 服务类
* </p>
*
* @author xuyang
* @since 2023-05-11
*/
public interface ICsEpdPqdService extends IService<CsEpdPqdPO> {
}

View File

@@ -0,0 +1,22 @@
package com.njcn.access.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.access.pojo.po.CsTopicPO;
import java.util.List;
/**
* <p>
* 服务类
* </p>
*
* @author xuyang
* @since 2023-05-12
*/
public interface ICsTopicService {
void askDevTopic(String nDid);
boolean addList(List<CsTopicPO> list);
}

View File

@@ -1,19 +1,19 @@
package com.njcn.access.service.serviceImpl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.AccessDto;
import com.njcn.access.pojo.dto.PublicDto;
import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.service.IAccessService;
import com.njcn.algorithm.api.EquipmentFeignClient;
import com.njcn.algorithm.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.PubUtils;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.Objects;
/**
@@ -40,15 +40,16 @@ public class AccessServiceImpl implements IAccessService {
logger.error("平台侧无此网关信息,请先录入!");
return;
}
PublicDto publicDto = new PublicDto();
publicDto.setMid(Long.toString(Instant.now().toEpochMilli()));
publicDto.setNDid(nDid);
publicDto.setTimestamp(Instant.now().toEpochMilli());
publicDto.setType("CMD_DEV_REGISTER");
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
reqAndResParam.setMid(1);
reqAndResParam.setDid("0");
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(TypeEnum.TYPE_2.getCode());
reqAndResParam.setExpire(-1);
AccessDto accessDto = new AccessDto();
accessDto.setNDid(nDid);
accessDto.setDevType(vo.getDevModel());
publicDto.setParam(accessDto);
publisher.send("/platform/register/"+nDid,new Gson().toJson(publicDto),1,false);
reqAndResParam.setMsg(accessDto);
publisher.send("/platform/register/"+nDid, PubUtils.obj2json(reqAndResParam),1,false);
}
}

View File

@@ -0,0 +1,20 @@
package com.njcn.access.service.serviceImpl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsBmdMapper;
import com.njcn.access.pojo.po.CsBmdPO;
import com.njcn.access.service.ICsBmdService;
import org.springframework.stereotype.Service;
/**
* <p>
* 服务实现类
* </p>
*
* @author xuyang
* @since 2023-05-11
*/
@Service
public class CsBmdServiceImpl extends ServiceImpl<CsBmdMapper, CsBmdPO> implements ICsBmdService {
}

View File

@@ -0,0 +1,20 @@
package com.njcn.access.service.serviceImpl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.mapper.CsEpdPqdMapper;
import com.njcn.access.pojo.po.CsEpdPqdPO;
import com.njcn.access.service.ICsEpdPqdService;
import org.springframework.stereotype.Service;
/**
* <p>
* 服务实现类
* </p>
*
* @author xuyang
* @since 2023-05-11
*/
@Service
public class CsEpdPqdServiceImpl extends ServiceImpl<CsEpdPqdMapper, CsEpdPqdPO> implements ICsEpdPqdService {
}

View File

@@ -0,0 +1,61 @@
package com.njcn.access.service.serviceImpl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.mapper.CsTopicMapper;
import com.njcn.access.pojo.param.ReqAndResParam;
import com.njcn.access.pojo.po.CsTopicPO;
import com.njcn.access.service.ICsTopicService;
import com.njcn.algorithm.api.EquipmentFeignClient;
import com.njcn.algorithm.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.common.utils.PubUtils;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Objects;
/**
* <p>
* 服务实现类
* </p>
*
* @author xuyang
* @since 2023-05-12
*/
@Service
@AllArgsConstructor
public class CsTopicServiceImpl extends ServiceImpl<CsTopicMapper, CsTopicPO> implements ICsTopicService {
private static final Logger logger = LoggerFactory.getLogger(CsTopicServiceImpl.class);
private final MqttPublisher publisher;
private final EquipmentFeignClient equipmentFeignClient;
@Override
public void askDevTopic(String nDid) {
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
if (Objects.isNull(vo)){
logger.error("平台侧无此网关信息,请先录入!");
return;
}
ReqAndResParam.Req reqAndResParam = new ReqAndResParam.Req();
reqAndResParam.setMid(1);
reqAndResParam.setDid("0");
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(TypeEnum.TYPE_1.getCode());
reqAndResParam.setExpire(-1);
reqAndResParam.setMsg(null);
publisher.send("/Pfm/Topic/"+nDid, PubUtils.obj2json(reqAndResParam),1,false);
}
@Override
public boolean addList(List<CsTopicPO> list) {
return this.saveOrUpdateBatch(list,100);
}
}

View File

@@ -53,75 +53,77 @@ public class DevModelServiceImpl implements IDevModelService {
String json = null;
try {
json = JsonUtil.convertStreamToString(devModelParam.getFile().getInputStream());
Gson gson = new Gson();
TemplateDto templateDto = gson.fromJson(json, TemplateDto.class);
ParamDto pojo = templateDto.getParam();
//网关模板
NetDevModDto po1 = pojo.getDataArray().get(0).getTemplate().getNetDevModDto();
//装置信息模板
DevCfgDetailDto po2 = pojo.getDataArray().get(0).getTemplate().getDevCfgDetailDto();
//装置数据模板
List<DevModDetailDto> po3 = pojo.getDataArray().get(0).getTemplate().getDevModDetailDto();
String name = po3.get(0).getName();
String version = po3.get(0).getVersion();
String time = po3.get(0).getTime();
String devType = po3.get(0).getDevType();
String devTypeId = "";
DictData dicData = dicDataFeignClient.getDicDataByCode(devType).getData();
if (Objects.isNull(dicData)) {
log.info("新增模板失败,获取装置类型字典数据为空,请先录入装置类型!");
return HttpResultUtil.assembleResult(CommonResponseEnum.NO_DATA.getCode(), null, "获取装置类型字典数据为空!");
} else {
devTypeId = dicData.getId();
}
CsDevModelQueryListParm csDevModelQueryListParm = new CsDevModelQueryListParm();
csDevModelQueryListParm.setDevType(devTypeId);
csDevModelQueryListParm.setVersionNo(version);
csDevModelQueryListParm.setVersionDate(time);
csDevModelQueryListParm.setName(name);
CsDevModelPageVO vo = devModelFeignClient.queryDevModelOne(csDevModelQueryListParm).getData();
if (!Objects.isNull(vo)){
log.info("新增模板失败,新增的模板在库中存在!");
return HttpResultUtil.assembleResult(AccessResponseEnum.MODEL_REPEAT.getCode(), null, AccessResponseEnum.MODEL_REPEAT.getMessage());
} else {
CsDevModelAddParm csDevModelAddParm = new CsDevModelAddParm();
csDevModelAddParm.setName(name);
csDevModelAddParm.setDevType(devTypeId);
csDevModelAddParm.setVersionNo(version);
csDevModelAddParm.setVersionDate(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(time));
String filePath = fileStorageUtil.uploadMultipart(devModelParam.getFile(), OssPath.DEV_MODEL + devModelParam.getDevType() + "_");
log.info("文件路径为:" + filePath);
csDevModelAddParm.setFilePath(filePath);
//新增cs_dev_model表数据
CsDevModelPO csDevModelPO = devModelFeignClient.addDevModel(csDevModelAddParm).getData();
//新增cs_data_set
//新增cs_data_array
}
// ParamDto pojo = templateDto.getParam();
// //网关模板
// NetDevModDto po1 = pojo.getDataArray().get(0).getTemplate().getNetDevModDto();
// //装置信息模板
// DevCfgDetailDto po2 = pojo.getDataArray().get(0).getTemplate().getDevCfgDetailDto();
// //装置数据模板
// List<DevModDetailDto> po3 = pojo.getDataArray().get(0).getTemplate().getDevModDetailDto();
//
// String name = po3.get(0).getName();
// String version = po3.get(0).getVersion();
// String time = po3.get(0).getTime();
// String devType = po3.get(0).getDevType();
// String devTypeId = "";
//
// DictData dicData = dicDataFeignClient.getDicDataByCode(devType).getData();
// if (Objects.isNull(dicData)) {
// log.info("新增模板失败,获取装置类型字典数据为空,请先录入装置类型!");
// return HttpResultUtil.assembleResult(CommonResponseEnum.NO_DATA.getCode(), null, "获取装置类型字典数据为空!");
// } else {
// devTypeId = dicData.getId();
// }
// CsDevModelQueryListParm csDevModelQueryListParm = new CsDevModelQueryListParm();
// csDevModelQueryListParm.setDevType(devTypeId);
// csDevModelQueryListParm.setVersionNo(version);
// csDevModelQueryListParm.setVersionDate(time);
// csDevModelQueryListParm.setName(name);
// CsDevModelPageVO vo = devModelFeignClient.queryDevModelOne(csDevModelQueryListParm).getData();
// if (!Objects.isNull(vo)){
// log.info("新增模板失败,新增的模板在库中存在!");
// return HttpResultUtil.assembleResult(AccessResponseEnum.MODEL_REPEAT.getCode(), null, AccessResponseEnum.MODEL_REPEAT.getMessage());
// } else {
// CsDevModelAddParm csDevModelAddParm = new CsDevModelAddParm();
// csDevModelAddParm.setName(name);
// csDevModelAddParm.setDevType(devTypeId);
// csDevModelAddParm.setVersionNo(version);
// csDevModelAddParm.setVersionDate(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(time));
// String filePath = fileStorageUtil.uploadMultipart(devModelParam.getFile(), OssPath.DEV_MODEL + devModelParam.getDevType() + "_");
// log.info("文件路径为:" + filePath);
// csDevModelAddParm.setFilePath(filePath);
// //新增cs_dev_model表数据
// CsDevModelPO csDevModelPO = devModelFeignClient.addDevModel(csDevModelAddParm).getData();
// //新增cs_data_set
//
// //新增cs_data_array
//
// }
} catch (IOException e) {
log.error("文件转成json出现异常");
e.getMessage();
} catch (ParseException e) {
log.error("时间转换出现异常");
e.getMessage();
}
// catch (ParseException e) {
// log.error("时间转换出现异常");
// e.getMessage();
// }
return HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), null, CommonResponseEnum.SUCCESS.getMessage());
}
/**
* 新增cs_data_set
*/
public void insertDataSet(List<DataSetDTO> dataSet,String id){
dataSet.forEach(item->{
DataSetDTO dataSetDTO = new DataSetDTO();
});
}
// public void insertDataSet(List<DataSetDTO> dataSet,String id){
// dataSet.forEach(item->{
// DataSetDTO dataSetDTO = new DataSetDTO();
//
//
//
// });
// }
}

View File

@@ -33,6 +33,8 @@ spring:
refresh: true
- data-Id: share-config-datasource-db.yaml
refresh: true
- data-Id: kafka-config.yaml
refresh: true
main:
allow-bean-definition-overriding: true