From f8b240cf5f6fe8333977a2062f6c39336a6b752c Mon Sep 17 00:00:00 2001 From: xuyang <748613696@qq.com> Date: Thu, 17 Aug 2023 20:25:19 +0800 Subject: [PATCH] =?UTF-8?q?1.=E7=89=A9=E8=A7=A3=E6=9E=90=E5=8A=9F=E8=83=BD?= =?UTF-8?q?-=E7=BB=9F=E8=AE=A1=E6=95=B0=E6=8D=AE=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=AE=8C=E6=88=90=202.=E7=89=A9=E6=B6=88=E6=81=AF=E8=B7=AF?= =?UTF-8?q?=E7=94=B1=E8=BD=AC=E5=8F=91=E5=8A=9F=E8=83=BD=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/handler/MqttMessageHandler.java | 3 +- ...tFeignClient.java => StatFeignClient.java} | 7 +- ...ry.java => StatClientFallbackFactory.java} | 8 +-- .../listener/RedisKeyExpirationListener.java | 67 +++++++++++++++++++ .../stat/service/impl/StatServiceImpl.java | 36 +++++++--- .../message/consumer/AppAutoDataConsumer.java | 14 ++-- 6 files changed, 112 insertions(+), 23 deletions(-) rename iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/{RtFeignClient.java => StatFeignClient.java} (70%) rename iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/{RtClientFallbackFactory.java => StatClientFallbackFactory.java} (83%) create mode 100644 iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/listener/RedisKeyExpirationListener.java diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 5a7f4c5..4c3a562 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -277,6 +277,8 @@ public class MqttMessageHandler { reqAndResParam.setCode(200); reqAndResParam.setMsg(heartBeatDto); publisher.send("/Dev/DataRsp/"+version+"/"+nDid,gson.toJson(reqAndResParam),1,false); + //装置改成在线 + csEquipmentDeliveryService.updateStatusBynDid(nDid, AccessEnum.ACCESS.getCode()); //处理业务逻辑 Object object = res.getMsg(); if (!Objects.isNull(object)){ @@ -292,7 +294,6 @@ public class MqttMessageHandler { break; case 4866: //处理主动上送数据 - //todo 将消息发送给rocketMQ,判断消息类型,需要回复 AutoDataDto dataDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), AutoDataDto.class); JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(dataDto)); AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class); diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/RtFeignClient.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/StatFeignClient.java similarity index 70% rename from iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/RtFeignClient.java rename to iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/StatFeignClient.java index c275ece..1860a84 100644 --- a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/RtFeignClient.java +++ b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/StatFeignClient.java @@ -3,16 +3,15 @@ package com.njcn.stat.api; import com.njcn.common.pojo.constant.ServerInfo; import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppAutoDataMessage; -import com.njcn.stat.api.fallback.RtClientFallbackFactory; -import io.swagger.annotations.ApiOperation; +import com.njcn.stat.api.fallback.StatClientFallbackFactory; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; /** * @author xy */ -@FeignClient(value = ServerInfo.CS_STAT_BOOT, path = "/stat", fallbackFactory = RtClientFallbackFactory.class,contextId = "stat") -public interface RtFeignClient { +@FeignClient(value = ServerInfo.CS_STAT_BOOT, path = "/stat", fallbackFactory = StatClientFallbackFactory.class,contextId = "stat") +public interface StatFeignClient { @PostMapping("/analysis") HttpResult analysis(AppAutoDataMessage appAutoDataMessage); diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/RtClientFallbackFactory.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/StatClientFallbackFactory.java similarity index 83% rename from iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/RtClientFallbackFactory.java rename to iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/StatClientFallbackFactory.java index 9e1e4af..75c5614 100644 --- a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/RtClientFallbackFactory.java +++ b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/StatClientFallbackFactory.java @@ -4,7 +4,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.mq.message.AppAutoDataMessage; -import com.njcn.stat.api.RtFeignClient; +import com.njcn.stat.api.StatFeignClient; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -14,16 +14,16 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -public class RtClientFallbackFactory implements FallbackFactory { +public class StatClientFallbackFactory implements FallbackFactory { @Override - public RtFeignClient create(Throwable cause) { + public StatFeignClient create(Throwable cause) { //判断抛出异常是否为解码器抛出的业务异常 Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; if (cause.getCause() instanceof BusinessException) { BusinessException businessException = (BusinessException) cause.getCause(); } Enum finalExceptionEnum = exceptionEnum; - return new RtFeignClient() { + return new StatFeignClient() { @Override public HttpResult analysis(AppAutoDataMessage appAutoDataMessage) { diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/listener/RedisKeyExpirationListener.java new file mode 100644 index 0000000..ce447c1 --- /dev/null +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/listener/RedisKeyExpirationListener.java @@ -0,0 +1,67 @@ +package com.njcn.stat.listener; + +import cn.hutool.core.collection.CollectionUtil; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.enums.StatResponseEnum; +import com.njcn.system.api.EpdFeignClient; +import com.njcn.system.pojo.dto.EpdDTO; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.annotation.Order; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2022年04月02日 14:31 + */ +@Slf4j +@Component +public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { + + @Resource + private EpdFeignClient epdFeignClient; + + @Resource + private RedisUtil redisUtil; + + public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + + /** + * 针对redis数据失效事件,进行数据处理 + * 注意message.toString()可以获取失效的key + */ + @Override + @Order(0) + public void onMessage(Message message, byte[] pattern) { + if (StringUtils.isBlank(message.toString())) { + return; + } + //判断失效的key + String expiredKey = message.toString(); + if(expiredKey.equals(AppRedisKey.ELE_EPD_PQD)){ + Map map = new HashMap<>(); + List list = epdFeignClient.findAll().getData(); + if (CollectionUtil.isEmpty(list)){ + throw new BusinessException(StatResponseEnum.DICT_NULL); + } + list.forEach(item->{ + map.put(item.getDictName(),item.getTableName()); + }); + redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L); + } + } +} diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index 84efc2d..6e78fb9 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -1,9 +1,7 @@ package com.njcn.stat.service.impl; import cn.hutool.core.collection.CollectionUtil; -import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.gson.Gson; -import com.njcn.access.pojo.dto.CsModelDto; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.CsLineFeignClient; @@ -25,6 +23,9 @@ import com.njcn.system.pojo.dto.EpdDTO; import com.njcn.system.pojo.po.DictData; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -55,9 +56,12 @@ public class StatServiceImpl implements IStatService { private final RedisUtil redisUtil; + private final Integer NUMBER = 200; + @Override @Transactional(rollbackFor = Exception.class) public void analysis(AppAutoDataMessage appAutoDataMessage) { + log.info("开始消费{},发送时间{}",appAutoDataMessage.getKey(),appAutoDataMessage.getSendTime()); //1.根据设备网络识别码获取设备id,查询到所用的模板,用来判断模板的类型(治理模板还是电能质量模板) //2.解析appAutoDataMessage的Did,来判断当前数据是治理数据还是电能质量数据 //3-1.治理数据则获取治理的dataArray,并且查询治理的监测点 @@ -85,6 +89,7 @@ public class StatServiceImpl implements IStatService { saveData(); } if (CollectionUtil.isNotEmpty(list)){ + List recordList = new ArrayList<>(); for (AppAutoDataMessage.DataArray item : list) { switch (item.getDataAttr()) { case 1: @@ -113,7 +118,12 @@ public class StatServiceImpl implements IStatService { } else { dataArrayList = objectToList(object); } - insertData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod()); + List result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod()); + recordList.addAll(result); + } + if (CollectionUtil.isNotEmpty(recordList)){ + //influx数据批量入库 + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, recordList); } } } @@ -155,7 +165,7 @@ public class StatServiceImpl implements IStatService { list.forEach(item->{ map.put(item.getDictName(),item.getTableName()); }); - redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,600L); + redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L); } /** @@ -173,9 +183,10 @@ public class StatServiceImpl implements IStatService { /** - * influxDB数据入库 + * influxDB数据组装 */ - public void insertData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod) { + public List assembleData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod) { + List records = new ArrayList(); //解码 List floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData())); if (CollectionUtil.isEmpty(floats)){ @@ -185,8 +196,13 @@ public class StatServiceImpl implements IStatService { if (!Objects.equals(dataArrayList.size(),floats.size())){ throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH); } + //判断字典数据是否存在 + if (Objects.isNull(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD))){ + saveData(); + } + Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); for (int i = 0; i < dataArrayList.size(); i++) { - String tableName = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class).get(dataArrayList.get(i).getName()).toString(); + String tableName = map.get(dataArrayList.get(i).getName()); Map tags = new HashMap<>(); tags.put(InfluxDBTableConstant.LINE_ID,lineId); tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase()); @@ -195,8 +211,12 @@ public class StatServiceImpl implements IStatService { Map fields = new HashMap<>(); fields.put(dataArrayList.get(i).getName(),floats.get(i)); fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag()); - influxDbUtils.insert(tableName, tags, fields, item.getDataTimeSec(), TimeUnit.SECONDS); + Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec(), TimeUnit.SECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + records.add(batchPoints.lineProtocol()); } + return records; } public List objectToList(Object object) { diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java index e14d486..d7fd2b9 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java @@ -3,7 +3,7 @@ package com.njcn.message.consumer; import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; import com.njcn.mq.constant.BusinessTopic; import com.njcn.mq.message.AppAutoDataMessage; -import com.njcn.stat.api.RtFeignClient; +import com.njcn.stat.api.StatFeignClient; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; @@ -20,15 +20,17 @@ import javax.annotation.Resource; */ @Service @RocketMQMessageListener( - topic = BusinessTopic.NJCJ_APP_AUTO_DATA_TOPIC, - consumerGroup = BusinessTopic.NJCJ_APP_AUTO_DATA_TOPIC, + topic = BusinessTopic.NJCN_APP_AUTO_DATA_TOPIC, + selectorExpression = BusinessTopic.AppDataTag.STAT_TAG, + consumerGroup = BusinessTopic.NJCN_APP_AUTO_DATA_TOPIC, + consumeThreadNumber = 10, enableMsgTrace = true ) @Slf4j public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { @Resource - private RtFeignClient rtFeignClient; + private StatFeignClient statFeignClient; @Override protected void handleMessage(AppAutoDataMessage appAutoDataMessage) { @@ -38,8 +40,8 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler