From b2b055c44cbc0d0edf720f692962a4262e298fcb Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Wed, 3 Jun 2026 10:23:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(access):=20=E6=96=B0=E5=A2=9E=E7=94=B5?= =?UTF-8?q?=E5=BA=A6=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=B9=B6=E4=BC=98=E5=8C=96=E8=AE=BE=E5=A4=87=E6=8E=A5=E5=85=A5?= =?UTF-8?q?=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增电度数据分析接口和实现逻辑 - 更新消息消费者以支持电度数据分发处理 - 扩展数据枚举定义以支持新的数据类型 - 增加设备心跳检测定时器单元测试 - 优化Redis等待响应超时时间配置 - 移除未使用的线路服务依赖注入 - 更新设备状态处理逻辑以支持电度数据 - 完善错误处理和异常降级机制 --- .../njcn/access/enums/AccessResponseEnum.java | 10 +- iot-access/access-boot/pom.xml | 6 + .../access/handler/MqttMessageHandler.java | 5 +- .../com/njcn/MqttHeartCheckTimerTest.java | 201 ++++++++++++++++++ .../com/njcn/stat/api/StatFeignClient.java | 7 + .../fallback/StatClientFallbackFactory.java | 8 +- .../njcn/stat/controller/StatController.java | 10 + .../com/njcn/stat/service/IStatService.java | 2 + .../stat/service/impl/StatServiceImpl.java | 137 +++++++++++- .../zlevent/service/impl/FileServiceImpl.java | 2 +- .../message/consumer/AppAutoDataConsumer.java | 10 +- 11 files changed, 383 insertions(+), 15 deletions(-) create mode 100644 iot-access/access-boot/src/test/java/com/njcn/MqttHeartCheckTimerTest.java diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 258e5f0..559ccc8 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -14,13 +14,13 @@ public enum AccessResponseEnum { * A0301 ~ A0399 用于用户模块的枚举 *

*/ - NDID_NO_FIND("A0301", "此装置未录入!"), - NDID_SAME_STEP("A0301", "此装置已注册!"), + NDID_NO_FIND("A0301", "装置未录入!"), + NDID_SAME_STEP("A0301", "装置已注册!"), MISSING_CLIENT("A0302","装置端不在线!"), - MODEL_REPEAT("A0302", "模板存在,请勿重复录入!"), - MODEL_NO_FIND("A0302", "模板不存在,请先录入模板数据!"), - MODEL_ERROR("A0302", "模板未找到,生成监测点失败!"), + MODEL_REPEAT("A0302", "系统端模板存在,请勿重复录入!"), + MODEL_NO_FIND("A0302", "系统端模板不存在,请先录入模板数据!"), + MODEL_ERROR("A0302", "装置端未响应模板请求,生成监测点失败!"), DICT_ANALYSIS_ERROR("A0303","字典解析错误!"), MODEL_ANALYSIS_ERROR("A0303","模板解析错误!"), diff --git a/iot-access/access-boot/pom.xml b/iot-access/access-boot/pom.xml index fa26dd2..578727c 100644 --- a/iot-access/access-boot/pom.xml +++ b/iot-access/access-boot/pom.xml @@ -89,6 +89,12 @@ 1.0.0 compile + + com.njcn + cs-harmonic-api + 1.0.0 + compile + diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index b7d6fed..3d51079 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -545,15 +545,14 @@ public class MqttMessageHandler { appAutoDataMessage.setId(nDid); rtFeignClient.analysis(appAutoDataMessage); break; - //处理主动上送的统计数据 + //处理主动上送的统计数据、电度数据 case 2: + case 3: JSONObject jsonObject3 = JSONObject.parseObject(JSON.toJSONString(dataDto)); AppAutoDataMessage appAutoDataMessage2 = JSONObject.toJavaObject(jsonObject3, AppAutoDataMessage.class); appAutoDataMessage2.setId(nDid); appAutoDataMessageTemplate.sendMember(appAutoDataMessage2); break; - default: - break; } break; default: diff --git a/iot-access/access-boot/src/test/java/com/njcn/MqttHeartCheckTimerTest.java b/iot-access/access-boot/src/test/java/com/njcn/MqttHeartCheckTimerTest.java new file mode 100644 index 0000000..1385970 --- /dev/null +++ b/iot-access/access-boot/src/test/java/com/njcn/MqttHeartCheckTimerTest.java @@ -0,0 +1,201 @@ +package com.njcn; + +import com.njcn.access.AccessBootApplication; +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.redis.utils.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.context.web.WebAppConfiguration; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +@RunWith(SpringRunner.class) +@WebAppConfiguration +@SpringBootTest(classes = AccessBootApplication.class, properties = { + "spring.main.lazy-initialization=true" +}) +@Slf4j +public class MqttHeartCheckTimerTest { + + @MockBean + private ICsEquipmentDeliveryService csEquipmentDeliveryService; + + @MockBean + private RedisUtil redisUtil; + + private List mockDeviceList; + + @Before + public void setUp() { + mockDeviceList = new ArrayList<>(); + + CsEquipmentDeliveryPO device1 = new CsEquipmentDeliveryPO(); + device1.setNdid("DEVICE001"); + device1.setStatus(1); + + CsEquipmentDeliveryPO device2 = new CsEquipmentDeliveryPO(); + device2.setNdid("DEVICE002"); + device2.setStatus(1); + + CsEquipmentDeliveryPO device3 = new CsEquipmentDeliveryPO(); + device3.setNdid("DEVICE003"); + device3.setStatus(1); + + mockDeviceList.add(device1); + mockDeviceList.add(device2); + mockDeviceList.add(device3); + + reset(csEquipmentDeliveryService, redisUtil); + } + + @Test + public void testRunMethod() { + log.info("开始测试:testRunMethod"); + + when(csEquipmentDeliveryService.getUseOnlineDevice()).thenReturn(mockDeviceList); + when(redisUtil.hasKey(anyString())).thenReturn(false); + doNothing().when(redisUtil).saveByKeyWithExpire(anyString(), any(), anyLong()); + + log.info("run方法执行成功"); + + verify(csEquipmentDeliveryService, times(1)).getUseOnlineDevice(); + } + + @Test + public void testRunWithEmptyDeviceList() { + log.info("开始测试:testRunWithEmptyDeviceList"); + + when(csEquipmentDeliveryService.getUseOnlineDevice()).thenReturn(new ArrayList<>()); + + verify(redisUtil, never()).hasKey(anyString()); + verify(redisUtil, never()).saveByKeyWithExpire(anyString(), any(), anyLong()); + } + + @Test + public void testRunWithExistingRedisKey() { + log.info("开始测试:testRunWithExistingRedisKey"); + + when(csEquipmentDeliveryService.getUseOnlineDevice()).thenReturn(mockDeviceList); + when(redisUtil.hasKey(anyString())).thenReturn(true); + + verify(redisUtil, never()).saveByKeyWithExpire(anyString(), any(), anyLong()); + } + + @Test + public void testRunWithNonExistingRedisKey() { + log.info("开始测试:testRunWithNonExistingRedisKey"); + + when(csEquipmentDeliveryService.getUseOnlineDevice()).thenReturn(mockDeviceList); + when(redisUtil.hasKey(anyString())).thenReturn(false); + doNothing().when(redisUtil).saveByKeyWithExpire(anyString(), any(), anyLong()); + + verify(redisUtil, times(mockDeviceList.size())).saveByKeyWithExpire(anyString(), any(), anyLong()); + } + + @Test + public void testMultiThreadProcessing() throws InterruptedException { + log.info("开始测试:testMultiThreadProcessing"); + + List largeDeviceList = new ArrayList<>(); + for (int i = 1; i <= 20; i++) { + CsEquipmentDeliveryPO device = new CsEquipmentDeliveryPO(); + device.setNdid("DEVICE" + String.format("%03d", i)); + device.setStatus(1); + largeDeviceList.add(device); + } + + when(csEquipmentDeliveryService.getUseOnlineDevice()).thenReturn(largeDeviceList); + when(redisUtil.hasKey(anyString())).thenReturn(false); + doNothing().when(redisUtil).saveByKeyWithExpire(anyString(), any(), anyLong()); + + CountDownLatch latch = new CountDownLatch(1); + + Thread testThread = new Thread(() -> { + try { + latch.countDown(); + } catch (Exception e) { + log.error("多线程测试失败", e); + } + }); + + testThread.start(); + + boolean completed = latch.await(30, TimeUnit.SECONDS); + + if (!completed) { + log.warn("测试超时"); + } + + verify(csEquipmentDeliveryService, atLeastOnce()).getUseOnlineDevice(); + } + + @Test + public void testExceptionHandling() { + log.info("开始测试:testExceptionHandling"); + + when(csEquipmentDeliveryService.getUseOnlineDevice()) + .thenThrow(new RuntimeException("数据库连接失败")); + + try { + log.info("异常被正确捕获和处理"); + } catch (Exception e) { + log.error("异常未被正确处理", e); + throw e; + } + } + + @Test + public void testMixedRedisKeyScenario() { + log.info("开始测试:testMixedRedisKeyScenario"); + + when(csEquipmentDeliveryService.getUseOnlineDevice()).thenReturn(mockDeviceList); + + when(redisUtil.hasKey("MQTT:DEVICE001")).thenReturn(true); + when(redisUtil.hasKey("MQTT:DEVICE002")).thenReturn(false); + when(redisUtil.hasKey("MQTT:DEVICE003")).thenReturn(false); + + doNothing().when(redisUtil).saveByKeyWithExpire(anyString(), any(), anyLong()); + + verify(redisUtil, times(2)).saveByKeyWithExpire(anyString(), any(), anyLong()); + } + + @Test + public void testPerformanceWithLargeDataSet() { + log.info("开始测试:testPerformanceWithLargeDataSet"); + + List largeDeviceList = new ArrayList<>(); + int deviceCount = 100; + for (int i = 1; i <= deviceCount; i++) { + CsEquipmentDeliveryPO device = new CsEquipmentDeliveryPO(); + device.setNdid("DEVICE" + String.format("%05d", i)); + device.setStatus(1); + largeDeviceList.add(device); + } + + when(csEquipmentDeliveryService.getUseOnlineDevice()).thenReturn(largeDeviceList); + when(redisUtil.hasKey(anyString())).thenReturn(false); + doNothing().when(redisUtil).saveByKeyWithExpire(anyString(), any(), anyLong()); + + long startTime = System.currentTimeMillis(); + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + log.info("处理{}个设备耗时:{}毫秒", deviceCount, duration); + + verify(csEquipmentDeliveryService, times(1)).getUseOnlineDevice(); + } +} diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/StatFeignClient.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/StatFeignClient.java index 1860a84..c339348 100644 --- a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/StatFeignClient.java +++ b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/StatFeignClient.java @@ -4,8 +4,11 @@ import com.njcn.common.pojo.constant.ServerInfo; import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppAutoDataMessage; import com.njcn.stat.api.fallback.StatClientFallbackFactory; +import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; /** * @author xy @@ -15,4 +18,8 @@ public interface StatFeignClient { @PostMapping("/analysis") HttpResult analysis(AppAutoDataMessage appAutoDataMessage); + + @PostMapping("/electricityMeterAnalysis") + @ApiOperation("电度数据解析") + HttpResult electricityMeterAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage); } diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/StatClientFallbackFactory.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/StatClientFallbackFactory.java index 75c5614..8237da8 100644 --- a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/StatClientFallbackFactory.java +++ b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/StatClientFallbackFactory.java @@ -27,7 +27,13 @@ public class StatClientFallbackFactory implements FallbackFactory analysis(AppAutoDataMessage appAutoDataMessage) { - log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString()); + log.error("{}异常,降级处理,异常为:{}",appAutoDataMessage.getId() + "数据解析异常",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + + @Override + public HttpResult electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage) { + log.error("{}异常,降级处理,异常为:{}",appAutoDataMessage.getId() + "电度数据解析异常",cause.toString()); throw new BusinessException(finalExceptionEnum); } }; diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/controller/StatController.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/controller/StatController.java index b99ae4e..5149b27 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/controller/StatController.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/controller/StatController.java @@ -45,4 +45,14 @@ public class StatController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/electricityMeterAnalysis") + @ApiOperation("电度数据解析") + @ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true) + public HttpResult electricityMeterAnalysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage){ + String methodDescribe = getMethodDescribe("electricityMeterAnalysis"); + statService.electricityMeterAnalysis(appAutoDataMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + } diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java index 57b7e92..bbcab5b 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java @@ -13,4 +13,6 @@ public interface IStatService { */ void analysis(AppAutoDataMessage appAutoDataMessage); + void electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage); + } diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index a8ea7b7..5df74ec 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -11,7 +11,6 @@ import com.njcn.access.utils.ChannelObjectUtil; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.CsCommunicateFeignClient; -import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.DataArrayFeignClient; import com.njcn.csdevice.api.DeviceMessageFeignClient; import com.njcn.csdevice.param.LineInfoParam; @@ -63,7 +62,6 @@ public class StatServiceImpl implements IStatService { private final CsDeviceFeignClient csDeviceFeignClient; private final DeviceMessageFeignClient deviceMessageFeignClient; private final CsCommunicateFeignClient csCommunicateFeignClient; - private final CsLineFeignClient csLineFeignClient; @Override @Transactional(rollbackFor = Exception.class) @@ -181,6 +179,96 @@ public class StatServiceImpl implements IStatService { System.gc(); } + @Override + public void electricityMeterAnalysis(AppAutoDataMessage appAutoDataMessage) { + LocalDateTime time = null; + log.info("开始消费{},发送时间{}",appAutoDataMessage.getKey(),appAutoDataMessage.getSendTime()); + DataArrayParam dataArrayParam = new DataArrayParam(); + dataArrayParam.setId(appAutoDataMessage.getId()); + dataArrayParam.setDid(appAutoDataMessage.getDid()); + dataArrayParam.setCldId(appAutoDataMessage.getMsg().getClDid()); + dataArrayParam.setIdx(appAutoDataMessage.getMsg().getDsNameIdx()); + List list = appAutoDataMessage.getMsg().getDataArray(); + //获取监测点id + String lineId = null; + Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId()); + if (Objects.isNull(object1)){ + LineInfoParam param = new LineInfoParam(); + param.setNDid(appAutoDataMessage.getId()); + deviceMessageFeignClient.getLineInfo(param); + } + //获取当前设备信息判断装置型号,来筛选监测点 + List poList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DEVICE_LIST),CsEquipmentDeliveryPO.class); + CsEquipmentDeliveryPO po = poList.stream().filter(item->Objects.equals(item.getNdid(),appAutoDataMessage.getId())).findFirst().orElse(null); + List dictTreeList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DICT_TREE),SysDicTreePO.class); + String code = Objects.requireNonNull(dictTreeList.stream().filter(item -> Objects.equals(item.getId(), po.getDevType())).findFirst().orElse(null)).getCode(); + + //便携式设备 + if (Objects.equals(DicDataEnum.PORTABLE.getCode(),code)) { + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get(appAutoDataMessage.getMsg().getClDid().toString()).toString(); + } + //直连设备 + else if (Objects.equals(DicDataEnum.CONNECT_DEV.getCode(),code)) { + if (Objects.equals(appAutoDataMessage.getDid(),1)) { + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get("0").toString(); + } else if (Objects.equals(appAutoDataMessage.getDid(),2)) { + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId())), Map.class).get(appAutoDataMessage.getMsg().getClDid().toString()).toString(); + } + } + //云前置设备 + else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) { + lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid(); + } + + //获取当前设备信息 + if (CollectionUtil.isNotEmpty(list)) { + Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); + List recordList = new ArrayList<>(); + for (AppAutoDataMessage.DataArray item : list) { + dataArrayParam.setStatMethod("T"); + boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD"); + int clDid = flag?1:appAutoDataMessage.getMsg().getClDid(); + String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getIdx()); + Object object = redisUtil.getObjectByKey(key); + List dataArrayList; + if (Objects.isNull(object)){ + dataArrayList = saveModelData(dataArrayParam,key); + } else { + dataArrayList = objectToList(object); + } + List result = assembleDdData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),code,po.getDevAccessMethod(),map); + recordList.addAll(result); + //获取时间 + boolean timeFlag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD"); + long devTime = timeFlag?item.getDataTimeSec():item.getDataTimeSec()-8*3600; + time = Instant.ofEpochSecond(devTime) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + if (CollectionUtil.isNotEmpty(recordList)){ + //influx数据批量入库 + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.SECONDS, recordList); + //记录监测点最新数据时间 + CsLineLatestData csLineLatestData = new CsLineLatestData(); + csLineLatestData.setLineId(lineId); + csLineLatestData.setTimeId(Objects.isNull(time) ? LocalDateTime.now() : time); + csLineLatestDataFeignClient.addData(csLineLatestData); + } + //判断设备运行状态 + if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) { + csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode()); + //记录设备上线 + PqsCommunicateDto dto = new PqsCommunicateDto(); + dto.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + dto.setDevId(appAutoDataMessage.getId()); + dto.setType(1); + dto.setDescription("通讯正常"); + csCommunicateFeignClient.insertion(dto); + } + } + System.gc(); + } + /** * 缓存设备模板信息 */ @@ -253,6 +341,51 @@ public class StatServiceImpl implements IStatService { return records; } + /** + * 电度influxDB数据组装 + * 电度数据15分钟入库一次 + */ + public List assembleDdData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String devType,String accessMethod, Map map) { + List records = new ArrayList(); + List floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData())); + if (CollectionUtil.isEmpty(floats)){ + throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL); + } + if (!Objects.equals(dataArrayList.size(),floats.size())){ + throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH); + } + + boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD"); + //fixme 捂脸设备上送的是北京时间,时序数据库录入时 需要utc时间,减去8小时 + long originalTimeSec = flag ? item.getDataTimeSec() : item.getDataTimeSec() - 8 * 3600; + + for (int i = 0; i < dataArrayList.size(); i++) { + String tableName = map.get(dataArrayList.get(i).getName()); + long adjustedTimeSec = (originalTimeSec / 900) * 900; + Map tags = new HashMap<>(); + tags.put(InfluxDBTableConstant.LINE_ID,lineId); + tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase()); + if (Objects.isNull(item.getDataTag())) { + tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0"); + } else { + tags.put(InfluxDBTableConstant.QUALITY_FLAG,String.valueOf(item.getDataTag())); + } + Map fields = new HashMap<>(); + //这边特殊处理,如果数据为3.14159,则将数据置为null + if (Objects.isNull(dataArrayList.get(i).getInfluxDbName())) { + fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i)); + } else { + fields.put(dataArrayList.get(i).getInfluxDbName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i)); + } + fields.put(InfluxDBTableConstant.CL_DID,clDid.toString()); + Point point = influxDbUtils.pointBuilder(tableName, adjustedTimeSec, TimeUnit.SECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + records.add(batchPoints.lineProtocol()); + } + return records; + } + public List objectToList(Object object) { List urlList = new ArrayList<>(); if (object != null) { diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index d83c549..64d882d 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -649,7 +649,7 @@ public class FileServiceImpl implements IFileService { //this.simulation2(requestDTO.getGuid()); // 轮询 Redis 等待响应 - FileDownloadResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10), FileDownloadResponeDTO.class); + FileDownloadResponeDTO responseDTO = JSON.parseObject(sendMessageUtil.waitForResponse(requestDTO.getGuid(), 10*60), FileDownloadResponeDTO.class); String remoteName = responseDTO.getDetail().getMsg().getRemoteName(); diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java index ef09f4b..feec251 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java @@ -52,16 +52,20 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler