From 021d17e6e7663675a9bb5a74a25ff8bb519e55d9 Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Tue, 22 Oct 2024 11:17:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/handler/MqttMessageHandler.java | 96 +------------------ .../zlevent/service/impl/FileServiceImpl.java | 12 ++- 2 files changed, 14 insertions(+), 94 deletions(-) diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index d1f431b..2efc399 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -44,6 +44,7 @@ import com.njcn.mq.template.AppFileMessageTemplate; import com.njcn.mq.template.AppFileStreamMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; +import com.njcn.rt.api.RtFeignClient; import com.njcn.zlevent.api.WaveFeignClient; import com.njcn.zlevent.pojo.dto.WaveTimeDto; import lombok.AllArgsConstructor; @@ -97,6 +98,7 @@ public class MqttMessageHandler { private final OverlimitMapper overlimitMapper; private final ChannelObjectUtil channelObjectUtil; private final WaveFeignClient waveFeignClient; + private final RtFeignClient rtFeignClient; @Autowired Validator validator; @@ -205,95 +207,6 @@ public class MqttMessageHandler { } } -// /** -// * 装置类型模板应答 -// * 1.判断网关的类型 -// * 2.直联设备的DevCfg和DevMod是以直联设备为准,上送平台端,平台端保存。通过校验DevMod模板信息来从平台端模板池中选取对应的模板,如果找不到匹配模板需告警提示人工干预处理。 -// * 3.平台端需读取装置的DevMod来判断网关支持的设备模板(包含设备型号和模板版本),根据app提交的接入子设备DID匹配数据模板(型号及版本),生成DevCfg下发给网关,网关根据下发信息生成就地设备点表。 -// * @param topic -// * @param message -// * @param nDid -// * @param payload -// */ -// @MqttSubscribe(value = "/Pfm/DevRsp/{version}/{edgeId}",qos = 1) -// @Transactional(rollbackFor = Exception.class) -// public void devModelOperation(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload){ -// log.info("收到当前设备所用模板响应--->" + nDid); -// DeviceLogDTO logDto = new DeviceLogDTO(); -// try{ -// logDto.setUserName(RequestUtil.getUsername()); -// logDto.setLoginName(RequestUtil.getLoginName()); -// } catch (Exception e) { -// logDto.setUserName("系统重启或定时任务创建"); -// logDto.setLoginName(null); -// } -// logDto.setOperate(nDid + "设备类型模板应答"); -// logDto.setResult(1); -// //业务处理 -// Gson gson = new Gson(); -// ModelDto modelDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ModelDto.class); -// if (Objects.equals(modelDto.getType(),Integer.parseInt(TypeEnum.TYPE_18.getCode()))){ -// List list = modelDto.getMsg().getDevMod(); -// List list2 = modelDto.getMsg().getDevCfg(); -// if (CollectionUtils.isEmpty(list)){ -// log.error(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); -// logDto.setResult(0); -// logDto.setFailReason(AccessResponseEnum.MODEL_VERSION_ERROR.getMessage()); -// csLogsFeignClient.addUserLog(logDto); -// //有异常删除缓存的模板信息 -// redisUtil.delete(AppRedisKey.MODEL + nDid); -// throw new BusinessException(AccessResponseEnum.MODEL_VERSION_ERROR); -// } -// //校验前置传递的装置模板库中是否存在 -// List modelList = new ArrayList<>(); -// list.forEach(item->{ -// Integer did = null; -// for (DevCfgDto item2 : list2) { -// if (Objects.equals(item.getDevType(),item2.getDevType())){ -// did = item2.getDid(); -// } -// } -// CsModelDto csModelDto = new CsModelDto(); -// CsDevModelPO po = devModelFeignClient.findModel(item.getDevType(),item.getVersionNo(),item.getVersionDate()).getData(); -// if (Objects.isNull(po)){ -// log.error(AccessResponseEnum.MODEL_NO_FIND.getMessage()); -// logDto.setResult(0); -// logDto.setFailReason(AccessResponseEnum.MODEL_NO_FIND.getMessage()); -// csLogsFeignClient.addUserLog(logDto); -// //有异常删除缓存的模板信息 -// redisUtil.delete(AppRedisKey.MODEL + nDid); -// throw new BusinessException(AccessResponseEnum.MODEL_NO_FIND); -// } -// if (Objects.equals(po.getType(),0)){ -// List dataSetList = dataSetFeignClient.getModuleDataSet(po.getId()).getData(); -// if (CollectionUtils.isEmpty(dataSetList)){ -// logDto.setResult(0); -// logDto.setFailReason(AccessResponseEnum.MODULE_NUMBER_IS_NULL.getMessage()); -// csLogsFeignClient.addUserLog(logDto); -// //有异常删除缓存的模板信息 -// redisUtil.delete(AppRedisKey.MODEL + nDid); -// throw new BusinessException(AccessResponseEnum.MODULE_NUMBER_IS_NULL); -// } -// csModelDto.setModuleNumber(dataSetList.size()); -// } -// csModelDto.setDevType(po.getDevTypeName()); -// csModelDto.setModelId(po.getId()); -// csModelDto.setDid(did); -// csModelDto.setType(po.getType()); -// modelList.add(csModelDto); -// }); -// //存储模板id -// String key2 = AppRedisKey.MODEL + nDid; -// redisUtil.saveByKeyWithExpire(key2,modelList,600L); -// //存储监测点模板信息,用于界面回显 -// List modelId = modelList.stream().map(CsModelDto::getModelId).collect(Collectors.toList()); -// List lineList = csLineModelService.getMonitorNumByModelId(modelId); -// String key = AppRedisKey.LINE + nDid; -// redisUtil.saveByKeyWithExpire(key,lineList,600L); -// //csLogsFeignClient.addUserLog(logDto); -// } -// } - /** * 设备响应 * @param topic @@ -626,10 +539,7 @@ public class MqttMessageHandler { JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto)); AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class); appAutoDataMessage.setId(nDid); - appAutoDataMessage.getMsg().getDataArray().forEach(item->{ - log.info(nDid + "处理实时数据" + item.getDataAttr()); - }); - appAutoDataMessageTemplate.sendMember(appAutoDataMessage); + rtFeignClient.analysis(appAutoDataMessage); break; //处理主动上送的统计数据 case 2: diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index 53d70ba..a00d586 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -64,12 +64,12 @@ public class FileServiceImpl implements IFileService { private final ICsWaveAnalysisService iCsWaveAnalysisService; private final ChannelObjectUtil channelObjectUtil; private final RemoveInfoUtils removeInfoUtils; + private static Integer mid = 1; @Override public void analysisFileInfo(AppFileMessage appFileMessage) { if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){ DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); - int mid = 1; int range = 51200; String fileName = appFileMessage.getMsg().getFileInfo().getName(); //缓存文件信息用于文件流拼接 @@ -100,10 +100,20 @@ public class FileServiceImpl implements IFileService { csWave.setCheckNumber(appFileMessage.getMsg().getFileInfo().getFileCheck()); csWave.setStatus(0); csWaveService.save(csWave); + //获取mid + Object object = channelObjectUtil.getDeviceMid(appFileMessage.getId()); + if (!Objects.isNull(object)) { + mid = (Integer) object; + } //请求当前文件的数据 askFileStream(appFileMessage.getId(),mid,fileName,-1,range); redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); redisUtil.delete(AppRedisKey.TIME+fileName); + mid = mid + 1; + if (mid > 10000) { + mid = 1; + } + redisUtil.saveByKey(AppRedisKey.DEVICE_MID + appFileMessage.getId(),mid); } } else { throw new BusinessException(AccessResponseEnum.RESPONSE_ERROR);