MQTT上传文件至装置
This commit is contained in:
@@ -13,10 +13,7 @@ import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestPart;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
/**
|
||||
@@ -48,5 +45,14 @@ public class CsDevModelController extends BaseController {
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@PostMapping("/uploadDevFile")
|
||||
@ApiOperation("上传文件至装置")
|
||||
@Transactional(rollbackFor = {Exception.class})
|
||||
public HttpResult<String> uploadDevFile(@RequestPart("file") @Validated MultipartFile file, @RequestParam("id") @Validated String id, @RequestParam("filePath") @Validated String path){
|
||||
String methodDescribe = getMethodDescribe("uploadDevFile");
|
||||
csDevModelService.uploadDevFile(file,id,path);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import com.njcn.access.enums.TypeEnum;
|
||||
import com.njcn.access.pojo.RspDataDto;
|
||||
import com.njcn.access.pojo.dto.*;
|
||||
import com.njcn.access.pojo.dto.file.FileDto;
|
||||
import com.njcn.access.pojo.dto.file.FileRedisDto;
|
||||
import com.njcn.access.pojo.param.ReqAndResParam;
|
||||
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
|
||||
import com.njcn.access.pojo.po.CsLineModel;
|
||||
@@ -517,10 +518,12 @@ public class MqttMessageHandler {
|
||||
break;
|
||||
//处理主动上送的统计数据
|
||||
case 2:
|
||||
log.info(nDid + "处理统计数据");
|
||||
JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto));
|
||||
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class);
|
||||
appAutoDataMessage.setId(nDid);
|
||||
appAutoDataMessage.getMsg().getDataArray().forEach(item->{
|
||||
log.info(nDid + "处理统计数据" + item.getDataAttr());
|
||||
});
|
||||
appAutoDataMessageTemplate.sendMember(appAutoDataMessage);
|
||||
break;
|
||||
default:
|
||||
@@ -566,6 +569,12 @@ public class MqttMessageHandler {
|
||||
log.info("需要缓存请求的文件信息");
|
||||
}
|
||||
break;
|
||||
case 4659:
|
||||
log.info("装置收到系统上传的文件");
|
||||
FileRedisDto fileRedisDto = new FileRedisDto();
|
||||
fileRedisDto.setCode(fileDto.getCode());
|
||||
redisUtil.saveByKeyWithExpire("uploadFileStep",fileRedisDto,10L);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@@ -583,7 +592,7 @@ public class MqttMessageHandler {
|
||||
reqAndResParam.setMid(mid);
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_6.getCode()));
|
||||
reqAndResParam.setExpire(1);
|
||||
reqAndResParam.setExpire(-1);
|
||||
AskDataDto askDataDto = new AskDataDto();
|
||||
askDataDto.setDataAttr(0);
|
||||
askDataDto.setOperate(1);
|
||||
|
||||
@@ -26,4 +26,11 @@ public interface ICsDevModelService {
|
||||
*/
|
||||
void addDict(MultipartFile file);
|
||||
|
||||
/**
|
||||
* 上传文件至装置
|
||||
* 更新程序、更新模板
|
||||
*/
|
||||
void uploadDevFile(MultipartFile file, String id, String path);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -3,12 +3,18 @@ package com.njcn.access.service.impl;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.access.enums.AccessEnum;
|
||||
import com.njcn.access.enums.AccessResponseEnum;
|
||||
import com.njcn.access.enums.DataModel;
|
||||
import com.njcn.access.enums.TypeEnum;
|
||||
import com.njcn.access.handler.MqttMessageHandler;
|
||||
import com.njcn.access.mapper.CsDevModelMapper;
|
||||
import com.njcn.access.pojo.dto.ReqAndResDto;
|
||||
import com.njcn.access.pojo.dto.UploadFileDto;
|
||||
import com.njcn.access.pojo.dto.data.*;
|
||||
import com.njcn.access.pojo.dto.devModel.*;
|
||||
import com.njcn.access.pojo.dto.file.FileRedisDto;
|
||||
import com.njcn.access.pojo.po.CsLineModel;
|
||||
import com.njcn.access.service.*;
|
||||
import com.njcn.access.utils.JsonUtil;
|
||||
@@ -17,11 +23,12 @@ import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.csdevice.api.CsLogsFeignClient;
|
||||
import com.njcn.csdevice.api.DevModelFeignClient;
|
||||
import com.njcn.csdevice.pojo.po.*;
|
||||
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
|
||||
import com.njcn.oss.constant.OssPath;
|
||||
import com.njcn.oss.utils.FileStorageUtil;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.*;
|
||||
import com.njcn.system.enums.DicDataEnum;
|
||||
import com.njcn.system.enums.DicTreeEnum;
|
||||
import com.njcn.system.pojo.param.CsWaveParam;
|
||||
import com.njcn.system.pojo.param.EleEpdPqdParam;
|
||||
import com.njcn.system.pojo.param.EleEvtParam;
|
||||
@@ -34,6 +41,7 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Date;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -79,6 +87,16 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
||||
|
||||
private final DictTreeFeignClient dictTreeFeignClient;
|
||||
|
||||
private final MqttPublisher publisher;
|
||||
|
||||
private final ICsTopicService csTopicService;
|
||||
|
||||
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
|
||||
|
||||
private final RedisUtil redisUtil;
|
||||
|
||||
private final MqttMessageHandler mqttMessageHandler;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = {Exception.class})
|
||||
public void addModel(MultipartFile file) {
|
||||
@@ -134,6 +152,130 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void uploadDevFile(MultipartFile file,String id,String path) {
|
||||
DeviceLogDTO logDto = null;
|
||||
try {
|
||||
//判断nDid是否存在
|
||||
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(id);
|
||||
logDto = new DeviceLogDTO();
|
||||
logDto.setUserName(RequestUtil.getUserNickname());
|
||||
logDto.setLoginName(RequestUtil.getUsername());
|
||||
if (Objects.isNull(csEquipmentDeliveryVO.getNdid())) {
|
||||
logDto.setResult(0);
|
||||
logDto.setFailReason(AccessResponseEnum.NDID_NO_FIND.getMessage());
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
throw new BusinessException(AccessResponseEnum.NDID_NO_FIND);
|
||||
}
|
||||
//存储文件至文件服务器
|
||||
fileStorageUtil.uploadMultipart(file, OssPath.SYSTEM_TO_DEV + file.getOriginalFilename() + "_");
|
||||
//获取版本
|
||||
String version = csTopicService.getVersion(id);
|
||||
int cap = 50 * 1024;
|
||||
byte[] bytes = file.getBytes();
|
||||
int length = bytes.length;
|
||||
//需要分片处理 一帧按50k大小传递
|
||||
if (length > cap){
|
||||
//需要循环的次数
|
||||
int times = bytes.length / cap + 1;
|
||||
for (int i = 1; i <= times; i++) {
|
||||
FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey("uploadFileStep");
|
||||
byte[] lsBytes;
|
||||
if (length > 50*1024) {
|
||||
lsBytes = Arrays.copyOfRange(bytes, (i - 1) * cap, i * cap);
|
||||
ReqAndResDto.Req req = getPojo(i,path,file,bytes.length,lsBytes,(i-1)*cap);
|
||||
publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false);
|
||||
logDto.setOperate(id + "设备上送文件,这是第" + i + "帧");
|
||||
logDto.setResult(1);
|
||||
length = length - cap;
|
||||
//判断是否重发
|
||||
sendNextStep(logDto,path,file,bytes.length,lsBytes,(i-1)*cap,version,id,i);
|
||||
//重发之后判断继续循环还是跳出循环
|
||||
if (!Objects.equals(fileRedisDto.getCode(),200)) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
lsBytes = Arrays.copyOfRange(bytes, (i - 1) * cap, bytes.length);
|
||||
ReqAndResDto.Req req = getPojo(i,path,file,bytes.length,lsBytes,(i-1)*cap);
|
||||
publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false);
|
||||
logDto.setOperate(id + "设备上送文件,这是最后一帧,为第" + i + "帧");
|
||||
logDto.setResult(1);
|
||||
//判断是否重发
|
||||
sendNextStep(logDto,path,file,bytes.length,lsBytes,(i-1)*cap,version,id,i);
|
||||
}
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
}
|
||||
} else {
|
||||
ReqAndResDto.Req req = getPojo(1,path,file,length,bytes,0);
|
||||
publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false);
|
||||
logDto.setOperate(id + "系统上送文件,当前文件只有1帧");
|
||||
logDto.setResult(1);
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
//判断是否重发
|
||||
sendNextStep(logDto,path,file,length,bytes,0,version,id,1);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
assert logDto != null;
|
||||
logDto.setResult(0);
|
||||
logDto.setFailReason(AccessResponseEnum.UPLOAD_ERROR.getMessage());
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 上送文件至装置
|
||||
*/
|
||||
public ReqAndResDto.Req getPojo(Integer mid, String path, MultipartFile file, Integer allLength, byte[] bytes, Integer offset) {
|
||||
//组装报文
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
reqAndResParam.setMid(mid);
|
||||
reqAndResParam.setDid(0);
|
||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_10.getCode()));
|
||||
reqAndResParam.setExpire(-1);
|
||||
UploadFileDto uploadFileDto = new UploadFileDto();
|
||||
uploadFileDto.setName(path + "/" + file.getOriginalFilename());
|
||||
uploadFileDto.setFileSize(allLength);
|
||||
uploadFileDto.setOffset(offset);
|
||||
uploadFileDto.setLen(bytes.length);
|
||||
uploadFileDto.setData(Base64.getEncoder().encodeToString(bytes));
|
||||
reqAndResParam.setMsg(uploadFileDto);
|
||||
return reqAndResParam;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据装置响应来判断发送的内容
|
||||
*/
|
||||
public void sendNextStep(DeviceLogDTO logDto, String path, MultipartFile file, int length, byte[] bytes, Integer offset, String version, String id, int mid) {
|
||||
try {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Thread.sleep(100);
|
||||
FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey("uploadFileStep");
|
||||
FileRedisDto fileRedis = new FileRedisDto();
|
||||
if (Objects.nonNull(fileRedisDto.getCode()) && fileRedisDto.getCode().equals(200)) {
|
||||
fileRedis.setCode(200);
|
||||
break;
|
||||
} else {
|
||||
ReqAndResDto.Req req = getPojo(mid,path,file,length,bytes,offset);
|
||||
publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false);
|
||||
logDto.setOperate(id + "系统上送文件,装置响应失败,重新发送,这是第" + (i+1) + "次");
|
||||
logDto.setResult(1);
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
fileRedis.setCode(fileRedisDto.getCode());
|
||||
}
|
||||
redisUtil.saveByKeyWithExpire("uploadFileStep",fileRedis,10L);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
assert logDto != null;
|
||||
logDto.setResult(0);
|
||||
logDto.setFailReason(AccessResponseEnum.RELOAD_UPLOAD_ERROR.getMessage());
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 新增cs_dev_model数据
|
||||
*/
|
||||
|
||||
@@ -113,6 +113,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
throw new BusinessException(AccessResponseEnum.NDID_NO_FIND);
|
||||
}
|
||||
//判断是否已经注册过
|
||||
if (!Objects.isNull(csEquipmentDeliveryVO.getNdid()) && Objects.equals(type,csEquipmentDeliveryVO.getProcess()) && Objects.equals(AccessEnum.ACCESS.getCode(),csEquipmentDeliveryVO.getStatus())){
|
||||
logDto.setResult(0);
|
||||
logDto.setFailReason(AccessResponseEnum.NDID_SAME_STEP.getMessage());
|
||||
csLogsFeignClient.addUserLog(logDto);
|
||||
throw new BusinessException(AccessResponseEnum.NDID_SAME_STEP);
|
||||
}
|
||||
//2.判断设备是否是直连设备
|
||||
SysDicTreePO sysDicTreePo = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData();
|
||||
if (Objects.isNull(sysDicTreePo)){
|
||||
|
||||
@@ -35,6 +35,10 @@ spring:
|
||||
refresh: true
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
servlet:
|
||||
multipart:
|
||||
max-file-size: 100MB
|
||||
max-request-size: 100MB
|
||||
|
||||
#项目日志的配置
|
||||
logging:
|
||||
|
||||
Reference in New Issue
Block a user