diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 208a9ba..d3fd6fe 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -58,6 +58,8 @@ public enum AccessResponseEnum { MODEL_MISS("A0308","模板信息缺失!"), MODEL_VERSION_ERROR("A0308","询问装置模板信息错误"), + UPLOAD_ERROR("A0308","平台上送文件异常"), + RELOAD_UPLOAD_ERROR("A0308","平台重新上送文件异常"), CLDID_IS_NULL("A0309","逻辑子设备标识为空"), MODULE_NUMBER_IS_NULL("A0309","设备子模块个数为空"), diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/UploadFileDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/UploadFileDto.java new file mode 100644 index 0000000..80b3dd5 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/UploadFileDto.java @@ -0,0 +1,33 @@ +package com.njcn.access.pojo.dto; + +import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author xy + */ +@Data +public class UploadFileDto { + + @SerializedName("Name") + @ApiModelProperty("文件名称,全路径") + private String name; + + @SerializedName("FileSize") + @ApiModelProperty("文件总大小(单位字节)") + private Integer fileSize; + + @SerializedName("Offset") + @ApiModelProperty("当前上送数据包在文件中偏移位置") + private Integer offset; + + @SerializedName("Len") + @ApiModelProperty("当前上送数据包长度") + private Integer len; + + @SerializedName("Data") + @ApiModelProperty("文件包数据") + private String data; + +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileRedisDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileRedisDto.java new file mode 100644 index 0000000..1dfbcd0 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/file/FileRedisDto.java @@ -0,0 +1,22 @@ +package com.njcn.access.pojo.dto.file; + +import lombok.Data; + +/** + * @author xy + */ +@Data +public class FileRedisDto { + + private String name; + + private Integer allStep; + + private Integer nowStep; + + private Integer retry = 0; + + private Integer code; + + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java index 241ea45..2360812 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDevModelController.java @@ -13,10 +13,7 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestPart; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; /** @@ -48,5 +45,14 @@ public class CsDevModelController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/uploadDevFile") + @ApiOperation("上传文件至装置") + @Transactional(rollbackFor = {Exception.class}) + public HttpResult uploadDevFile(@RequestPart("file") @Validated MultipartFile file, @RequestParam("id") @Validated String id, @RequestParam("filePath") @Validated String path){ + String methodDescribe = getMethodDescribe("uploadDevFile"); + csDevModelService.uploadDevFile(file,id,path); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index a62c48e..450b28a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -17,6 +17,7 @@ import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.RspDataDto; import com.njcn.access.pojo.dto.*; import com.njcn.access.pojo.dto.file.FileDto; +import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.param.ReqAndResParam; import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.pojo.po.CsLineModel; @@ -517,10 +518,12 @@ public class MqttMessageHandler { break; //处理主动上送的统计数据 case 2: - log.info(nDid + "处理统计数据"); JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto)); AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class); appAutoDataMessage.setId(nDid); + appAutoDataMessage.getMsg().getDataArray().forEach(item->{ + log.info(nDid + "处理统计数据" + item.getDataAttr()); + }); appAutoDataMessageTemplate.sendMember(appAutoDataMessage); break; default: @@ -566,6 +569,12 @@ public class MqttMessageHandler { log.info("需要缓存请求的文件信息"); } break; + case 4659: + log.info("装置收到系统上传的文件"); + FileRedisDto fileRedisDto = new FileRedisDto(); + fileRedisDto.setCode(fileDto.getCode()); + redisUtil.saveByKeyWithExpire("uploadFileStep",fileRedisDto,10L); + break; default: break; } @@ -583,7 +592,7 @@ public class MqttMessageHandler { reqAndResParam.setMid(mid); reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_6.getCode())); - reqAndResParam.setExpire(1); + reqAndResParam.setExpire(-1); AskDataDto askDataDto = new AskDataDto(); askDataDto.setDataAttr(0); askDataDto.setOperate(1); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelService.java index 57a90b8..c44bb11 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDevModelService.java @@ -26,4 +26,11 @@ public interface ICsDevModelService { */ void addDict(MultipartFile file); + /** + * 上传文件至装置 + * 更新程序、更新模板 + */ + void uploadDevFile(MultipartFile file, String id, String path); + + } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index a2c0bad..b2752ae 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -3,12 +3,18 @@ package com.njcn.access.service.impl; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.github.tocrhz.mqtt.publisher.MqttPublisher; +import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessResponseEnum; import com.njcn.access.enums.DataModel; import com.njcn.access.enums.TypeEnum; +import com.njcn.access.handler.MqttMessageHandler; import com.njcn.access.mapper.CsDevModelMapper; +import com.njcn.access.pojo.dto.ReqAndResDto; +import com.njcn.access.pojo.dto.UploadFileDto; import com.njcn.access.pojo.dto.data.*; import com.njcn.access.pojo.dto.devModel.*; +import com.njcn.access.pojo.dto.file.FileRedisDto; import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.service.*; import com.njcn.access.utils.JsonUtil; @@ -17,11 +23,12 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.CsLogsFeignClient; import com.njcn.csdevice.api.DevModelFeignClient; import com.njcn.csdevice.pojo.po.*; +import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; import com.njcn.oss.constant.OssPath; import com.njcn.oss.utils.FileStorageUtil; +import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.*; import com.njcn.system.enums.DicDataEnum; -import com.njcn.system.enums.DicTreeEnum; import com.njcn.system.pojo.param.CsWaveParam; import com.njcn.system.pojo.param.EleEpdPqdParam; import com.njcn.system.pojo.param.EleEvtParam; @@ -34,6 +41,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; +import java.io.IOException; import java.sql.Date; import java.util.*; import java.util.concurrent.atomic.AtomicReference; @@ -79,6 +87,16 @@ public class CsDevModelServiceImpl implements ICsDevModelService { private final DictTreeFeignClient dictTreeFeignClient; + private final MqttPublisher publisher; + + private final ICsTopicService csTopicService; + + private final ICsEquipmentDeliveryService csEquipmentDeliveryService; + + private final RedisUtil redisUtil; + + private final MqttMessageHandler mqttMessageHandler; + @Override @Transactional(rollbackFor = {Exception.class}) public void addModel(MultipartFile file) { @@ -134,6 +152,130 @@ public class CsDevModelServiceImpl implements ICsDevModelService { } } + @Override + public void uploadDevFile(MultipartFile file,String id,String path) { + DeviceLogDTO logDto = null; + try { + //判断nDid是否存在 + CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(id); + logDto = new DeviceLogDTO(); + logDto.setUserName(RequestUtil.getUserNickname()); + logDto.setLoginName(RequestUtil.getUsername()); + if (Objects.isNull(csEquipmentDeliveryVO.getNdid())) { + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.NDID_NO_FIND.getMessage()); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(AccessResponseEnum.NDID_NO_FIND); + } + //存储文件至文件服务器 + fileStorageUtil.uploadMultipart(file, OssPath.SYSTEM_TO_DEV + file.getOriginalFilename() + "_"); + //获取版本 + String version = csTopicService.getVersion(id); + int cap = 50 * 1024; + byte[] bytes = file.getBytes(); + int length = bytes.length; + //需要分片处理 一帧按50k大小传递 + if (length > cap){ + //需要循环的次数 + int times = bytes.length / cap + 1; + for (int i = 1; i <= times; i++) { + FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey("uploadFileStep"); + byte[] lsBytes; + if (length > 50*1024) { + lsBytes = Arrays.copyOfRange(bytes, (i - 1) * cap, i * cap); + ReqAndResDto.Req req = getPojo(i,path,file,bytes.length,lsBytes,(i-1)*cap); + publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); + logDto.setOperate(id + "设备上送文件,这是第" + i + "帧"); + logDto.setResult(1); + length = length - cap; + //判断是否重发 + sendNextStep(logDto,path,file,bytes.length,lsBytes,(i-1)*cap,version,id,i); + //重发之后判断继续循环还是跳出循环 + if (!Objects.equals(fileRedisDto.getCode(),200)) { + break; + } + } else { + lsBytes = Arrays.copyOfRange(bytes, (i - 1) * cap, bytes.length); + ReqAndResDto.Req req = getPojo(i,path,file,bytes.length,lsBytes,(i-1)*cap); + publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); + logDto.setOperate(id + "设备上送文件,这是最后一帧,为第" + i + "帧"); + logDto.setResult(1); + //判断是否重发 + sendNextStep(logDto,path,file,bytes.length,lsBytes,(i-1)*cap,version,id,i); + } + csLogsFeignClient.addUserLog(logDto); + } + } else { + ReqAndResDto.Req req = getPojo(1,path,file,length,bytes,0); + publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); + logDto.setOperate(id + "系统上送文件,当前文件只有1帧"); + logDto.setResult(1); + csLogsFeignClient.addUserLog(logDto); + //判断是否重发 + sendNextStep(logDto,path,file,length,bytes,0,version,id,1); + } + } catch (Exception e) { + assert logDto != null; + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.UPLOAD_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); + throw new RuntimeException(e); + } + } + + /** + * 上送文件至装置 + */ + public ReqAndResDto.Req getPojo(Integer mid, String path, MultipartFile file, Integer allLength, byte[] bytes, Integer offset) { + //组装报文 + ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); + reqAndResParam.setMid(mid); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_10.getCode())); + reqAndResParam.setExpire(-1); + UploadFileDto uploadFileDto = new UploadFileDto(); + uploadFileDto.setName(path + "/" + file.getOriginalFilename()); + uploadFileDto.setFileSize(allLength); + uploadFileDto.setOffset(offset); + uploadFileDto.setLen(bytes.length); + uploadFileDto.setData(Base64.getEncoder().encodeToString(bytes)); + reqAndResParam.setMsg(uploadFileDto); + return reqAndResParam; + } + + /** + * 根据装置响应来判断发送的内容 + */ + public void sendNextStep(DeviceLogDTO logDto, String path, MultipartFile file, int length, byte[] bytes, Integer offset, String version, String id, int mid) { + try { + for (int i = 0; i < 3; i++) { + Thread.sleep(100); + FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey("uploadFileStep"); + FileRedisDto fileRedis = new FileRedisDto(); + if (Objects.nonNull(fileRedisDto.getCode()) && fileRedisDto.getCode().equals(200)) { + fileRedis.setCode(200); + break; + } else { + ReqAndResDto.Req req = getPojo(mid,path,file,length,bytes,offset); + publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); + logDto.setOperate(id + "系统上送文件,装置响应失败,重新发送,这是第" + (i+1) + "次"); + logDto.setResult(1); + csLogsFeignClient.addUserLog(logDto); + fileRedis.setCode(fileRedisDto.getCode()); + } + redisUtil.saveByKeyWithExpire("uploadFileStep",fileRedis,10L); + } + } catch (InterruptedException e) { + assert logDto != null; + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.RELOAD_UPLOAD_ERROR.getMessage()); + csLogsFeignClient.addUserLog(logDto); + throw new RuntimeException(e); + } + } + + /** * 新增cs_dev_model数据 */ diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index e86080c..2e7d0a2 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -113,6 +113,13 @@ public class CsDeviceServiceImpl implements ICsDeviceService { csLogsFeignClient.addUserLog(logDto); throw new BusinessException(AccessResponseEnum.NDID_NO_FIND); } + //判断是否已经注册过 + if (!Objects.isNull(csEquipmentDeliveryVO.getNdid()) && Objects.equals(type,csEquipmentDeliveryVO.getProcess()) && Objects.equals(AccessEnum.ACCESS.getCode(),csEquipmentDeliveryVO.getStatus())){ + logDto.setResult(0); + logDto.setFailReason(AccessResponseEnum.NDID_SAME_STEP.getMessage()); + csLogsFeignClient.addUserLog(logDto); + throw new BusinessException(AccessResponseEnum.NDID_SAME_STEP); + } //2.判断设备是否是直连设备 SysDicTreePO sysDicTreePo = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData(); if (Objects.isNull(sysDicTreePo)){ diff --git a/iot-access/access-boot/src/main/resources/bootstrap.yml b/iot-access/access-boot/src/main/resources/bootstrap.yml index eadfba5..48e3e85 100644 --- a/iot-access/access-boot/src/main/resources/bootstrap.yml +++ b/iot-access/access-boot/src/main/resources/bootstrap.yml @@ -35,6 +35,10 @@ spring: refresh: true main: allow-bean-definition-overriding: true + servlet: + multipart: + max-file-size: 100MB + max-request-size: 100MB #项目日志的配置 logging: