diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 24d142a..ab77130 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -20,6 +20,8 @@ public enum AccessResponseEnum { MODEL_REPEAT("A0302", "模板存在,请勿重复录入!"), MODEL_NO_FIND("A0302", "模板不存在,请先录入模板数据!"), + DICT_ANALYSIS_ERROR("A0303","字典解析错误!"), + MODEL_ANALYSIS_ERROR("A0303","模板解析错误!"), MESSAGE_TYPE_ERROR("A0303","报文消息类型Type错误!"), DEV_TYPE_ERROR("A0303","装置类型错误!"), DEV_NOT_FIND("A0303","装置类型未找到!"), @@ -58,6 +60,8 @@ public enum AccessResponseEnum { LDEVINFO_IS_NULL("A0309","逻辑设备信息为空"), SOFTINFO_IS_NULL("A0309","软件信息为空"), + LINE_POSITION_REPEAT("A0310","监测点位置重复") + ; private final String code; diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java index 635390b..26969b6 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java @@ -62,6 +62,7 @@ public enum TypeEnum { DATA_12("12","定值Set"), DATA_13("13","内部定值InSet"), DATA_14("14","控制Ctrl"), + DATA_16("16","波形文件"), /** * 数据模型列表 diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java new file mode 100644 index 0000000..8f23e28 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java @@ -0,0 +1,94 @@ +package com.njcn.access.pojo.dto; + +import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.List; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/22 11:36 + */ +@Data +public class EventDto { + + @SerializedName("Mid") + private Integer mid; + + @SerializedName("Did") + @ApiModelProperty("逻辑设备 治理逻辑设备为1 电能质量设备为2") + private Integer did; + + @SerializedName("Pri") + private Integer pri; + + @SerializedName("Type") + private Integer type; + + @SerializedName("Msg") + private Msg msg; + + @Data + public static class Msg{ + + @SerializedName("Cldid") + @ApiModelProperty("逻辑子设备 治理逻辑设备为0 电能质量设备为1、2") + private Integer clDid; + + @SerializedName("DataType") + private Integer dataType; + + @SerializedName("DataAttr") + @ApiModelProperty("数据属性:无-0、实时-1、统计-2") + private Integer dataAttr; + + @SerializedName("DsNameIdx") + private Integer dsNameIdx; + + @SerializedName("DataArray") + private List dataArray; + } + + @Data + public static class DataArray{ + + @SerializedName("Idx") + private Integer idx; + + @SerializedName("Name") + private String name; + + @SerializedName("DataTimeSec") + private Long dataTimeSec; + + @SerializedName("DataTimeUSec") + private Long dataTimeUSec; + + @SerializedName("Type") + @ApiModelProperty("事件类型") + private String type; + + @SerializedName("Parm") + private List param; + } + + @Data + public static class Param{ + + @SerializedName("Name") + private String name; + + @SerializedName("Type") + private String type; + + @SerializedName("Unit") + private String unit; + + @SerializedName("Data") + private Object data; + } +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsEvtParmPO.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsEvtParmPO.java deleted file mode 100644 index 53bca8f..0000000 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsEvtParmPO.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.njcn.access.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import com.github.jeffreyning.mybatisplus.anno.MppMultiId; -import com.njcn.db.bo.BaseEntity; -import java.io.Serializable; -import java.time.LocalDateTime; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; - -/** - *

- * 事件参数表 - *

- * - * @author xuyang - * @since 2023-05-19 - */ -@EqualsAndHashCode(callSuper = true) -@Data -@TableName("cs_evt_parm") -public class CsEvtParmPO extends BaseEntity { - - private static final long serialVersionUID = 1L; - - /** - * id - */ - private String id; - - /** - * pid - */ - @MppMultiId(value = "pid") - private String pid; - - /** - * 事件信息名称 - */ - @MppMultiId(value = "name") - private String name; - - /** - * 参数类型“string”“float” - */ - private String type; - - /** - * 参数单位 - */ - private String unit; - - /** - * 参数值 - */ - private String data; - -} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 4c3a562..002bc35 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -25,8 +25,11 @@ import com.njcn.csdevice.api.DataSetFeignClient; import com.njcn.csdevice.api.DevModelFeignClient; import com.njcn.csdevice.pojo.po.CsDataSet; import com.njcn.csdevice.pojo.po.CsDevModelPO; +import com.njcn.mq.constant.BusinessTopic; import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.template.AppAutoDataMessageTemplate; +import com.njcn.mq.template.AppEventMessageTemplate; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import lombok.AllArgsConstructor; @@ -65,9 +68,11 @@ public class MqttMessageHandler { private final ICsEquipmentDeliveryService csEquipmentDeliveryService; + private final DataSetFeignClient dataSetFeignClient; + private final AppAutoDataMessageTemplate appAutoDataMessageTemplate; - private final DataSetFeignClient dataSetFeignClient; + private final AppEventMessageTemplate appEventMessageTemplate; @Autowired Validator validator; @@ -293,12 +298,33 @@ public class MqttMessageHandler { } break; case 4866: - //处理主动上送数据 AutoDataDto dataDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), AutoDataDto.class); - JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(dataDto)); - AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class); - appAutoDataMessage.setId(nDid); - appAutoDataMessageTemplate.sendMember(appAutoDataMessage); + switch (dataDto.getMsg().getDataAttr()) { + //暂态事件、录波处理 + //todo 后期告警可能也是在这处理,通过是告警还是事件来区分暂态和稳态 + case 0: + log.info("处理事件"); + EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class); + JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto)); + AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class); + appEventMessage.setId(nDid); + appEventMessageTemplate.sendMember(appEventMessage); + break; + //实时数据 + case 1: + log.info("处理实时数据"); + break; + //处理主动上送的统计数据 + case 2: + log.info("处理统计数据"); + JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto)); + AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class); + appAutoDataMessage.setId(nDid); + appAutoDataMessageTemplate.sendMember(appAutoDataMessage); + break; + default: + break; + } //mid大于0,则需要应答设备侧 if (dataDto.getMid() > 0){ ReqAndResDto.Res response = new ReqAndResDto.Res(); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index 7cab046..99eff06 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -85,8 +85,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService { analysisDataSet(templateDto,csDevModelPo.getId()); //3.录入监测点模板表(记录当前模板有几个监测点,治理类型的模板目前规定1个监测点,电能质量模板根据逻辑子设备来) addCsLineModel(templateDto,csDevModelPo.getId()); - } catch (IOException e) { - e.printStackTrace(); + } catch (Exception e) { + throw new BusinessException(AccessResponseEnum.MODEL_ANALYSIS_ERROR); } } @@ -99,8 +99,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService { Gson gson = new Gson(); TemplateDto templateDto = gson.fromJson(json, TemplateDto.class); analysisDict(templateDto); - } catch (IOException e) { - e.printStackTrace(); + } catch (Exception e) { + throw new BusinessException(AccessResponseEnum.DICT_ANALYSIS_ERROR); } } @@ -201,6 +201,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService { eleEvtParam.setPid(po.getId()); eleEvtParam.setData(param.getData()); eleEvtParam.setName(param.getName()); + eleEvtParam.setShowName(param.getName()); eleEvtParam.setType(param.getType()); eleEvtParam.setUnit(param.getUnit()); eleEvtFeignClient.add(eleEvtParam); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index a2eb989..411a79a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -1,5 +1,6 @@ package com.njcn.access.service.impl; +import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.excel.util.CollectionUtils; import com.alibaba.fastjson.JSON; @@ -46,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * 类的介绍: @@ -228,6 +230,11 @@ public class CsDeviceServiceImpl implements ICsDeviceService { appLineTopologyDiagramPo.setStatus("1"); appLineTopologyDiagramPoList.add(appLineTopologyDiagramPo); } + List position = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList()); + List lineList = position.stream().filter(e-> Collections.frequency(position,e) > 1).distinct().collect(Collectors.toList()); + if (CollectionUtil.isNotEmpty(lineList)){ + throw new BusinessException(AccessResponseEnum.LINE_POSITION_REPEAT); + } csLineService.saveBatch(csLinePoList); //4.监测点拓扑图表录入关系 appLineTopologyDiagramService.saveBatch(appLineTopologyDiagramPoList); diff --git a/iot-access/access-boot/src/test/java/com/njcn/AppTest.java b/iot-access/access-boot/src/test/java/com/njcn/AppTest.java index fef8b4d..3e0212c 100644 --- a/iot-access/access-boot/src/test/java/com/njcn/AppTest.java +++ b/iot-access/access-boot/src/test/java/com/njcn/AppTest.java @@ -2,6 +2,7 @@ package com.njcn; import static org.junit.Assert.assertTrue; +import cn.hutool.core.util.IdUtil; import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.publisher.MqttPublisher; @@ -11,6 +12,7 @@ import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.TypeEnum; import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.mqtt.MqttClientDto; +import com.njcn.csdevice.pojo.po.CsLinePO; import io.lettuce.core.protocol.CompleteableCommand; import okhttp3.Credentials; import okhttp3.OkHttpClient; @@ -25,8 +27,9 @@ import org.springframework.test.context.web.WebAppConfiguration; import javax.annotation.Resource; import java.io.BufferedReader; import java.io.IOException; +import java.lang.reflect.Array; import java.nio.charset.StandardCharsets; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -56,12 +59,34 @@ public class AppTest @Test public void test() { - ReqAndResDto reqAndResParam = new ReqAndResDto(); - reqAndResParam.setMid(1); - reqAndResParam.setDid(0); - reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); - reqAndResParam.setType(4866); - publisher.send("/Dev/Data1/V1/123", new Gson().toJson(reqAndResParam),1,false); +// ReqAndResDto reqAndResParam = new ReqAndResDto(); +// reqAndResParam.setMid(1); +// reqAndResParam.setDid(0); +// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); +// reqAndResParam.setType(4866); +// publisher.send("/Dev/Data1/V1/123", new Gson().toJson(reqAndResParam),1,false); + +// String key = String.valueOf(IdUtil.getSnowflake().nextId()); +// System.out.println("key==:" + key); + + List csLinePoList = new ArrayList<>(); + CsLinePO po1 = new CsLinePO(); + po1.setPosition("1"); + CsLinePO po2= new CsLinePO(); + po2.setPosition("2"); + CsLinePO po3= new CsLinePO(); + po3.setPosition("3"); + CsLinePO po4= new CsLinePO(); + po4.setPosition("1"); + + csLinePoList.add(po1); + csLinePoList.add(po2); + csLinePoList.add(po3); + csLinePoList.add(po4); + List l = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList()); + System.out.println("l===:" + l); + List lineList = l.stream().filter(e-> Collections.frequency(l,e) > 1).distinct().collect(Collectors.toList()); + System.out.println("lineList==:" + lineList); } // public static void main(String[] args) { diff --git a/iot-analysis/analysis-stat/stat-boot/pom.xml b/iot-analysis/analysis-stat/stat-boot/pom.xml index 2f41cc1..3e20b3f 100644 --- a/iot-analysis/analysis-stat/stat-boot/pom.xml +++ b/iot-analysis/analysis-stat/stat-boot/pom.xml @@ -40,46 +40,46 @@ common-db ${project.version} - - com.njcn - access-api - ${project.version} - - - com.njcn - common-mq - ${project.version} - - - com.njcn - cs-device-api - ${project.version} - - + com.njcn - stat-api + access-api ${project.version} - - - com.njcn - system-api - ${project.version} - - + + com.njcn - pqs-influx - 0.0.1-SNAPSHOT - - - com.njcn - common-influxDB + common-mq ${project.version} - compile - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - + + + com.njcn + cs-device-api + ${project.version} + + + com.njcn + stat-api + ${project.version} + + + com.njcn + system-api + ${project.version} + + + com.njcn + pqs-influx + 0.0.1-SNAPSHOT + + + com.njcn + common-influxDB + ${project.version} + compile + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index 6e78fb9..c30a3cc 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -56,8 +56,6 @@ public class StatServiceImpl implements IStatService { private final RedisUtil redisUtil; - private final Integer NUMBER = 200; - @Override @Transactional(rollbackFor = Exception.class) public void analysis(AppAutoDataMessage appAutoDataMessage) { diff --git a/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java b/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java index a7e14d2..f255d81 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java +++ b/iot-analysis/analysis-stat/stat-boot/src/test/java/com/njcn/AppTest.java @@ -1,14 +1,38 @@ package com.njcn; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.common.utils.PubUtils; +import com.njcn.csdevice.pojo.po.CsDataArray; +import com.njcn.influx.pojo.constant.InfluxDBTableConstant; +import com.njcn.influxdb.utils.InfluxDbUtils; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; import org.junit.Test; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.annotation.Resource; +import java.util.*; +import java.util.concurrent.*; /** * Unit test for simple App. */ public class AppTest { + + @Resource + private RedisUtil redisUtil; + + @Resource + private InfluxDbUtils influxDbUtils; + /** * Rigorous Test :-) */ @@ -17,4 +41,5 @@ public class AppTest { assertTrue( true ); } + } diff --git a/iot-analysis/analysis-zl-event/zl-event-api/pom.xml b/iot-analysis/analysis-zl-event/zl-event-api/pom.xml index 6a69bce..12e23d4 100644 --- a/iot-analysis/analysis-zl-event/zl-event-api/pom.xml +++ b/iot-analysis/analysis-zl-event/zl-event-api/pom.xml @@ -8,6 +8,32 @@ 1.0.0 4.0.0 + + + com.njcn + common-core + ${project.version} + + + com.njcn + common-db + ${project.version} + + + org.projectlombok + lombok + + + com.njcn + common-microservice + ${project.version} + + + com.njcn + common-mq + ${project.version} + + zl-event-api diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java new file mode 100644 index 0000000..42a4fad --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java @@ -0,0 +1,18 @@ +package com.njcn.zlevent.api; + +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.api.fallback.EventClientFallbackFactory; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; + +/** + * @author xy + */ +@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/event", fallbackFactory = EventClientFallbackFactory.class,contextId = "event") +public interface EventFeignClient { + + @PostMapping("/analysis") + HttpResult analysis(AppEventMessage appEventMessage); +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java new file mode 100644 index 0000000..0be5fa7 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java @@ -0,0 +1,35 @@ +package com.njcn.zlevent.api.fallback; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.zlevent.api.EventFeignClient; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author xy + */ +@Slf4j +@Component +public class EventClientFallbackFactory implements FallbackFactory { + @Override + public EventFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if (cause.getCause() instanceof BusinessException) { + BusinessException businessException = (BusinessException) cause.getCause(); + } + Enum finalExceptionEnum = exceptionEnum; + return new EventFeignClient() { + + @Override + public HttpResult analysis(AppEventMessage appEventMessage) { + log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEvent.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEvent.java new file mode 100644 index 0000000..dd626e3 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEvent.java @@ -0,0 +1,48 @@ +package com.njcn.zlevent.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.db.bo.BaseEntity; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.time.LocalDateTime; + +/** + *

+ * 暂态事件表 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +@Data +@TableName("cs_event") +public class CsEvent { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + private String id; + + /** + * 监测点id + */ + private String lineId; + + /** + * 事件名称 + */ + private String name; + + /** + * 展示名称 + */ + private String showName; + + /** + * 开始时间 + */ + private LocalDateTime startTime; +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventDetail.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventDetail.java new file mode 100644 index 0000000..043aad2 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventDetail.java @@ -0,0 +1,61 @@ +package com.njcn.zlevent.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +/** + *

+ * 暂态事件详情表 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +@Data +@TableName("cs_event_detail") +public class CsEventDetail { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + private String id; + + /** + * 暂态事件id + */ + private String pid; + + /** + * 指标名称 + */ + private String name; + + /** + * 指标别名 + */ + private String showName; + + /** + * 数据类型 + */ + private String type; + + /** + * 单位 + */ + private String unit; + + /** + * 数值 + */ + private Double data; + + /** + * 相别 + */ + private String phasic; + + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml b/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml index 9533a0c..3ed6e00 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml +++ b/iot-analysis/analysis-zl-event/zl-event-boot/pom.xml @@ -20,6 +20,15 @@ + + com.baomidou + dynamic-datasource-spring-boot-starter + + + com.njcn + common-db + ${project.version} + com.njcn common-web @@ -39,6 +48,31 @@ common-db ${project.version} + + com.njcn + common-mq + ${project.version} + + + com.njcn + cs-device-api + ${project.version} + + + com.njcn + stat-api + ${project.version} + + + com.njcn + zl-event-api + ${project.version} + + + com.njcn + system-api + ${project.version} + diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java new file mode 100644 index 0000000..4f99edf --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java @@ -0,0 +1,47 @@ +package com.njcn.zlevent.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.web.controller.BaseController; +import com.njcn.zlevent.service.IEventService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/14 9:23 + */ +@Slf4j +@RestController +@RequestMapping("/event") +@Api(tags = "暂态事件处理") +@AllArgsConstructor +public class EventController extends BaseController { + + private final IEventService eventService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/analysis") + @ApiOperation("事件解析") + @ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true) + public HttpResult analysis(@RequestBody AppEventMessage appEventMessage){ + String methodDescribe = getMethodDescribe("analysis"); + eventService.analysis(appEventMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java new file mode 100644 index 0000000..75a6fc8 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/WaveController.java @@ -0,0 +1,41 @@ +package com.njcn.zlevent.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @crateTime 2023/8/23 9:59 + */ +@Slf4j +@RestController +@RequestMapping("/wave") +@Api(tags = "录波处理") +@AllArgsConstructor +public class WaveController extends BaseController { + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/analysis") + @ApiOperation("录波解析") + @ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true) + public HttpResult analysis(){ + String methodDescribe = getMethodDescribe("analysis"); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/handler/MqttMessageHandler.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/handler/MqttMessageHandler.java new file mode 100644 index 0000000..01da64f --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/handler/MqttMessageHandler.java @@ -0,0 +1,29 @@ +package com.njcn.zlevent.handler; + +import com.github.tocrhz.mqtt.publisher.MqttPublisher; +import com.njcn.redis.utils.RedisUtil; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.validation.Validator; + +/** + * @author xuyang + * @version 1.0.0 + * @date 2023年08月23日 09:41 + */ +@Slf4j +@Component +@AllArgsConstructor +public class MqttMessageHandler { + + private final MqttPublisher publisher; + + private final RedisUtil redisUtil; + + @Autowired + Validator validator; + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java new file mode 100644 index 0000000..4e5bf30 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/listener/RedisKeyExpirationListener.java @@ -0,0 +1,67 @@ +package com.njcn.zlevent.listener; + +import cn.hutool.core.collection.CollectionUtil; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.enums.StatResponseEnum; +import com.njcn.system.api.EpdFeignClient; +import com.njcn.system.pojo.dto.EpdDTO; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.annotation.Order; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2022年04月02日 14:31 + */ +@Slf4j +@Component +public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { + + @Resource + private EpdFeignClient epdFeignClient; + + @Resource + private RedisUtil redisUtil; + + public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + + /** + * 针对redis数据失效事件,进行数据处理 + * 注意message.toString()可以获取失效的key + */ + @Override + @Order(0) + public void onMessage(Message message, byte[] pattern) { + if (StringUtils.isBlank(message.toString())) { + return; + } + //判断失效的key + String expiredKey = message.toString(); + if(expiredKey.equals(AppRedisKey.ELE_EPD_PQD)){ + Map map = new HashMap<>(); + List list = epdFeignClient.findAll().getData(); + if (CollectionUtil.isEmpty(list)){ + throw new BusinessException(StatResponseEnum.DICT_NULL); + } + list.forEach(item->{ + map.put(item.getDictName(),item.getTableName()); + }); + redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L); + } + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventDetailMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventDetailMapper.java new file mode 100644 index 0000000..b663106 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventDetailMapper.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.zlevent.pojo.po.CsEventDetail; + +/** + *

+ * 暂态事件详情表 Mapper 接口 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +public interface CsEventDetailMapper extends BaseMapper { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventMapper.java new file mode 100644 index 0000000..c7f7e3f --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventMapper.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.zlevent.pojo.po.CsEvent; + +/** + *

+ * 暂态事件表 Mapper 接口 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +public interface CsEventMapper extends BaseMapper { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventDetailService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventDetailService.java new file mode 100644 index 0000000..1754b42 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventDetailService.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.zlevent.pojo.po.CsEventDetail; + +/** + *

+ * 暂态事件详情表 服务类 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +public interface ICsEventDetailService extends IService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java new file mode 100644 index 0000000..28a9afa --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventService.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.zlevent.pojo.po.CsEvent; + +/** + *

+ * 暂态事件表 服务类 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +public interface ICsEventService extends IService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java new file mode 100644 index 0000000..31b1df2 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.service; + +import com.njcn.mq.message.AppEventMessage; + +/** + * @author xy + */ +public interface IEventService { + + /** + * 解析事件数据 + * @param + */ + void analysis(AppEventMessage appEventMessage); + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventDetailServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventDetailServiceImpl.java new file mode 100644 index 0000000..c75975e --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventDetailServiceImpl.java @@ -0,0 +1,20 @@ +package com.njcn.zlevent.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.zlevent.mapper.CsEventDetailMapper; +import com.njcn.zlevent.pojo.po.CsEventDetail; +import com.njcn.zlevent.service.ICsEventDetailService; +import org.springframework.stereotype.Service; + +/** + *

+ * 暂态事件详情表 服务实现类 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +@Service +public class CsEventDetailServiceImpl extends ServiceImpl implements ICsEventDetailService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java new file mode 100644 index 0000000..49d23c8 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java @@ -0,0 +1,20 @@ +package com.njcn.zlevent.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.zlevent.mapper.CsEventMapper; +import com.njcn.zlevent.pojo.po.CsEvent; +import com.njcn.zlevent.service.ICsEventService; +import org.springframework.stereotype.Service; + +/** + *

+ * 暂态事件表 服务实现类 + *

+ * + * @author xuyang + * @since 2023-08-23 + */ +@Service +public class CsEventServiceImpl extends ServiceImpl implements ICsEventService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java new file mode 100644 index 0000000..358ee38 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -0,0 +1,141 @@ +package com.njcn.zlevent.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.IdUtil; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.mq.message.AppEventMessage; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.enums.StatResponseEnum; +import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.api.EpdFeignClient; +import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.pojo.po.DictData; +import com.njcn.zlevent.pojo.po.CsEvent; +import com.njcn.zlevent.pojo.po.CsEventDetail; +import com.njcn.zlevent.service.ICsEventDetailService; +import com.njcn.zlevent.service.ICsEventService; +import com.njcn.zlevent.service.IEventService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/14 9:32 + */ +@Service +@Slf4j +@AllArgsConstructor +public class EventServiceImpl implements IEventService { + + private final CsLineFeignClient csLineFeignClient; + + private final DicDataFeignClient dicDataFeignClient; + + private final EpdFeignClient epdFeignClient; + + private final RedisUtil redisUtil; + + private final ICsEventService csEventService; + + private final ICsEventDetailService csEventDetailService; + + @Override + @Transactional(rollbackFor = Exception.class) + public void analysis(AppEventMessage appEventMessage) { + List list1 = new ArrayList<>(); + List list2 = new ArrayList<>(); + //获取监测点id + String lineId = null; + Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); + if (Objects.isNull(object1)){ + lineInfo(appEventMessage.getId()); + } + if (Objects.equals(appEventMessage.getDid(),1)){ + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString(); + } else if (Objects.equals(appEventMessage.getDid(),2)){ + lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString(); + } + //处理事件数据 + List dataArray = appEventMessage.getMsg().getDataArray(); + for (AppEventMessage.DataArray item : dataArray) { + String id = IdUtil.fastSimpleUUID(); + //事件入库 + CsEvent csEvent = new CsEvent(); + csEvent.setId(id); + csEvent.setLineId(lineId); + csEvent.setName(item.getName()); + csEvent.setShowName(epdFeignClient.findByName(item.getName()).getData().getShowName()); + csEvent.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec())); + List params = item.getParam(); + for (AppEventMessage.Param item2 : params) { + CsEventDetail csEventDetail = new CsEventDetail(); + csEventDetail.setPid(id); + csEventDetail.setName(item2.getName()); + csEventDetail.setShowName(epdFeignClient.findByName(item2.getName()).getData().getShowName()); + csEventDetail.setType(item2.getType()); + csEventDetail.setUnit(item2.getUnit()); + if (Objects.equals(item2.getName(),"Evt_VVaPhas")){ + csEventDetail.setPhasic(item2.getData().toString()); + } else { + csEventDetail.setData(Double.parseDouble(item2.getData().toString())); + } + list2.add(csEventDetail); + } + list1.add(csEvent); + } + csEventService.saveBatch(list1); + csEventDetailService.saveBatch(list2); + } + + /** + * 缓存监测点相关信息 + */ + public void lineInfo(String id) { + Map map = new HashMap<>(); + List lineList = csLineFeignClient.findByNdid(id).getData(); + if (CollectionUtil.isEmpty(lineList)){ + throw new BusinessException(StatResponseEnum.LINE_NULL); + } + for (CsLinePO item : lineList) { + DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData(); + if (Objects.isNull(dictData)){ + throw new BusinessException(StatResponseEnum.DICT_NULL); + } + if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){ + map.put(0,item.getLineId()); + } else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){ + map.put(1,item.getLineId()); + } else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){ + map.put(2,item.getLineId()); + } + } + redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,600L); + } + + /** + * 时间转换 + */ + public LocalDateTime timeFormat(Long time1, Long time2) { + //设置格式 + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String timeText = format.format(time1 * 1000); + String time = timeText + "." + time2; + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + return LocalDateTime.parse(time, fmt); + } + +} diff --git a/iot-message/message-boot/pom.xml b/iot-message/message-boot/pom.xml index 00b0e79..d9061b9 100644 --- a/iot-message/message-boot/pom.xml +++ b/iot-message/message-boot/pom.xml @@ -66,6 +66,11 @@ system-api ${project.version} + + com.njcn + zl-event-api + ${project.version} + diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java index 01d3975..d6a0c1a 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java @@ -50,6 +50,9 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + + @Resource + private EventFeignClient eventFeignClient; + + @Override + protected void handleMessage(AppEventMessage appEventMessage) { + Integer dataType = appEventMessage.getMsg().getDataType(); + switch (dataType) { + case 6: + log.info("分发至暂态事件处理"); + eventFeignClient.analysis(appEventMessage); + break; + case 16: + log.info("分发至录波处理"); + break; + default: + break; + } + } + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(AppEventMessage message) { + String keyStatus = redisUtil.getStringByKey(message.getKey()); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.BEING_PROCESSED, 60L); + return false; + } + return true; + } + + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(AppEventMessage message) { + redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(AppEventMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + /*** + * 调用父类handler处理消息的元信息 + */ + @Override + public void onMessage(AppEventMessage appEventMessage) { + super.dispatchMessage(appEventMessage); + } +} diff --git a/pom.xml b/pom.xml index 5d8a24a..6e5d8a4 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,7 @@ 192.168.1.13 + 127.0.0.1 192.168.1.13