暂态波形文件接收解析功能

This commit is contained in:
2023-10-08 10:46:58 +08:00
parent 29407e4389
commit 34fc978543
9 changed files with 236 additions and 96 deletions

View File

@@ -43,6 +43,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -52,6 +53,8 @@ import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
@@ -361,6 +364,7 @@ public class MqttMessageHandler {
/**
* 装置心跳 && 主动数据上送
* fixme 这边由于接收文件数据时间跨度会很长,途中有其他请求进来会中断之前的程序,目前是记录中断的位置,等处理完成再继续请求接收文件
* @param topic
* @param message
* @param version
@@ -388,7 +392,7 @@ public class MqttMessageHandler {
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_29.getCode()));
reqAndResParam.setCode(200);
//fixme 前置处理的时间应该是UTC时间所以需要加8小时。
String json = "{Time:\""+(System.currentTimeMillis()/1000+8*3600)+"\"}";
String json = "{Time:"+(System.currentTimeMillis()/1000+8*3600)+"}";
net.sf.json.JSONObject jsonObject = net.sf.json.JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject);
publisher.send("/Dev/DataRsp/"+version+"/"+nDid,gson.toJson(reqAndResParam),1,false);
@@ -407,6 +411,18 @@ public class MqttMessageHandler {
break;
case 4866:
AutoDataDto dataDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), AutoDataDto.class);
//mid大于0则需要应答设备侧
if (dataDto.getMid() > 0){
ReqAndResDto.Res response = new ReqAndResDto.Res();
response.setMid(dataDto.getMid());
response.setDid(dataDto.getDid());
response.setPri(AccessEnum.FIRST_CHANNEL.getCode());
response.setType(Integer.parseInt(TypeEnum.TYPE_15.getCode()));
response.setCode(200);
log.info("应答事件:" + new Gson().toJson(response));
publisher.send("/Dev/DataRsp/"+version+"/"+nDid,new Gson().toJson(response),1,false);
}
//判断事件类型
switch (dataDto.getMsg().getDataAttr()) {
//暂态事件、录波处理
case 0:
@@ -433,17 +449,6 @@ public class MqttMessageHandler {
default:
break;
}
//mid大于0则需要应答设备侧
if (dataDto.getMid() > 0){
ReqAndResDto.Res response = new ReqAndResDto.Res();
response.setMid(dataDto.getMid());
response.setDid(dataDto.getDid());
response.setPri(AccessEnum.FIRST_CHANNEL.getCode());
response.setType(Integer.parseInt(TypeEnum.TYPE_15.getCode()));
response.setCode(200);
log.info("应答事件:" + new Gson().toJson(response));
publisher.send("/Dev/DataRsp/"+version+"/"+nDid,new Gson().toJson(response),1,false);
}
break;
default:
break;
@@ -471,15 +476,16 @@ public class MqttMessageHandler {
switch (fileDto.getType()){
case 4657:
log.info("获取文件信息");
log.info("文件信息响应:" + fileDto);
appFileMessageTemplate.sendMember(appFileMessage);
break;
case 4658:
log.info("获取文件流信息");
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName() + appFileMessage.getMid()), appFileMessage.getMid(),3600L);
appFileStreamMessageTemplate.sendMember(appFileMessage);
break;
default:
break;
}
}
}

View File

@@ -1,47 +1,47 @@
//package com.njcn.access.runner;
//
//import com.njcn.access.service.ICsEquipmentDeliveryService;
//import com.njcn.access.service.ICsTopicService;
//import com.njcn.access.service.impl.CsDeviceServiceImpl;
//import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.boot.ApplicationArguments;
//import org.springframework.boot.ApplicationRunner;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.util.List;
//import java.util.Objects;
//
///**
// * 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
// *
// * @author xuyang
// * @version 1.0.0
// * @createTime 2023/8/28 13:57
// */
//@Component
//@Slf4j
//public class AccessApplicationRunner implements ApplicationRunner {
//
// @Resource
// private CsDeviceServiceImpl csDeviceService;
//
// @Resource
// private ICsTopicService csTopicService;
//
// @Resource
// private ICsEquipmentDeliveryService csEquipmentDeliveryService;
//
// @Override
// public void run(ApplicationArguments args){
// List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getAll();
// list.forEach(item->{
// String version = csTopicService.getVersion(item.getNdid());
// if (!Objects.isNull(version)){
// csDeviceService.devAccess(item.getNdid(),version);
// }
// });
// }
//
//}
package com.njcn.access.runner;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/28 13:57
*/
@Component
@Slf4j
public class AccessApplicationRunner implements ApplicationRunner {
@Resource
private CsDeviceServiceImpl csDeviceService;
@Resource
private ICsTopicService csTopicService;
@Resource
private ICsEquipmentDeliveryService csEquipmentDeliveryService;
@Override
public void run(ApplicationArguments args){
List<CsEquipmentDeliveryPO> list = csEquipmentDeliveryService.getAll();
list.forEach(item->{
String version = csTopicService.getVersion(item.getNdid());
if (!Objects.isNull(version)){
csDeviceService.devAccess(item.getNdid(),version);
}
});
}
}