From 49daa2cc47a8a1234982ec12b867f2ee151a3bbf Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Tue, 11 Mar 2025 18:18:11 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=AE=AF=E8=A1=A8=E5=85=A5=E5=BA=93?= =?UTF-8?q?=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../stat/api/MessAnalysisFeignClient.java | 9 ++ ...essAnalysisFeignClientFallbackFactory.java | 7 + .../controller/MessageAnalysisController.java | 11 ++ .../stat/service/MessageAnalysisService.java | 3 + .../impl/MessageAnalysisServiceImpl.java | 18 ++- .../api/PqsCommunicateFeignClient.java | 5 + ...CommunicateFeignClientFallbackFactory.java | 6 + data-processing/data-processing-boot/pom.xml | 11 ++ .../controller/PqsCommunicateController.java | 9 ++ .../dataProcess/service/IPqsCommunicate.java | 1 + .../influxdb/InfluxdbPqsCommunicateImpl.java | 42 ++++- .../relation/RelationPqsCommunicateImpl.java | 5 + .../message/messagedto/DevComFlagDTO.java | 26 ++++ .../consumer/DeviceRunFlagDataConsumer.java | 143 ++++++++++++++++++ 14 files changed, 294 insertions(+), 2 deletions(-) create mode 100644 message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java create mode 100644 message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java diff --git a/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/MessAnalysisFeignClient.java b/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/MessAnalysisFeignClient.java index 5feec13..dec3619 100644 --- a/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/MessAnalysisFeignClient.java +++ b/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/MessAnalysisFeignClient.java @@ -1,10 +1,14 @@ package com.njcn.stat.api; +import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.response.HttpResult; +import com.njcn.message.messagedto.DevComFlagDTO; import com.njcn.message.messagedto.MessageDataDTO; import com.njcn.stat.api.fallback.MessAnalysisFeignClientFallbackFactory; +import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -26,4 +30,9 @@ import java.util.List; public interface MessAnalysisFeignClient { @PostMapping("/analysis") HttpResult analysis(@RequestBody List messageList); + + + @ApiOperation("处理设备状态数据") + @PostMapping("/handleDevRunflag") + HttpResult handleDevRunflag(@RequestBody DevComFlagDTO devComFlagDTO); } diff --git a/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/fallback/MessAnalysisFeignClientFallbackFactory.java b/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/fallback/MessAnalysisFeignClientFallbackFactory.java index 71672ea..a9c4cc0 100644 --- a/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/fallback/MessAnalysisFeignClientFallbackFactory.java +++ b/analysis/stat/stat-api/src/main/java/com/njcn/stat/api/fallback/MessAnalysisFeignClientFallbackFactory.java @@ -4,6 +4,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; +import com.njcn.message.messagedto.DevComFlagDTO; import com.njcn.message.messagedto.MessageDataDTO; import com.njcn.stat.api.MessAnalysisFeignClient; import com.njcn.stat.utils.StatEnumUtil; @@ -40,6 +41,12 @@ public class MessAnalysisFeignClientFallbackFactory implements FallbackFactory handleDevRunflag(DevComFlagDTO devComFlagDTO) { + log.error("{}异常,降级处理,异常为:{}", "处理设备状态数据", throwable.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/analysis/stat/stat-boot/src/main/java/com/njcn/stat/controller/MessageAnalysisController.java b/analysis/stat/stat-boot/src/main/java/com/njcn/stat/controller/MessageAnalysisController.java index 3ea7220..3d639d0 100644 --- a/analysis/stat/stat-boot/src/main/java/com/njcn/stat/controller/MessageAnalysisController.java +++ b/analysis/stat/stat-boot/src/main/java/com/njcn/stat/controller/MessageAnalysisController.java @@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; +import com.njcn.message.messagedto.DevComFlagDTO; import com.njcn.message.messagedto.MessageDataDTO; import com.njcn.stat.service.MessageAnalysisService; import com.njcn.web.controller.BaseController; @@ -44,6 +45,16 @@ public class MessageAnalysisController extends BaseController { messageAnalysisService.analysis(messageList); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, CommonResponseEnum.SUCCESS.getMessage(), methodDescribe); + } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/handleDevRunflag") + @ApiOperation("处理设备状态数据") + public HttpResult handleDevRunflag(@RequestBody DevComFlagDTO devComFlagDTO){ + String methodDescribe = getMethodDescribe("handleDevRunflag"); + + messageAnalysisService.handleDevRunflag(devComFlagDTO); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, CommonResponseEnum.SUCCESS.getMessage(), methodDescribe); } } diff --git a/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/MessageAnalysisService.java b/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/MessageAnalysisService.java index eb721af..9ee0c59 100644 --- a/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/MessageAnalysisService.java +++ b/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/MessageAnalysisService.java @@ -1,6 +1,7 @@ package com.njcn.stat.service; +import com.njcn.message.messagedto.DevComFlagDTO; import com.njcn.message.messagedto.MessageDataDTO; import java.util.List; @@ -14,4 +15,6 @@ import java.util.List; */ public interface MessageAnalysisService { void analysis(List messageList); + + void handleDevRunflag(DevComFlagDTO devComFlagDTO); } diff --git a/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/impl/MessageAnalysisServiceImpl.java b/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/impl/MessageAnalysisServiceImpl.java index 876948d..50ece87 100644 --- a/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/impl/MessageAnalysisServiceImpl.java +++ b/analysis/stat/stat-boot/src/main/java/com/njcn/stat/service/impl/MessageAnalysisServiceImpl.java @@ -1,11 +1,16 @@ package com.njcn.stat.service.impl; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.LocalDateTimeUtil; import com.alibaba.fastjson.JSONObject; import com.njcn.dataProcess.api.LnDataDealFeignClient; +import com.njcn.dataProcess.api.PqsCommunicateFeignClient; import com.njcn.dataProcess.api.RmpEventDetailFeignClient; import com.njcn.dataProcess.dto.*; +import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto; import com.njcn.message.enums.DataTypeEnum; +import com.njcn.message.messagedto.DevComFlagDTO; import com.njcn.message.messagedto.MessageDataDTO; import com.njcn.stat.messagedto.*; import com.njcn.stat.service.MessageAnalysisService; @@ -33,7 +38,8 @@ public class MessageAnalysisServiceImpl implements MessageAnalysisService { @Autowired private LnDataDealFeignClient lnDataDealFeignClient; - + @Autowired + private PqsCommunicateFeignClient pqsCommunicateFeignClient; // @Autowired // private RmpEventDetailFeignClient rmpEventDetailFeignClient; @Override @@ -509,4 +515,14 @@ public class MessageAnalysisServiceImpl implements MessageAnalysisService { lnDataDealFeignClient.batchInsertion(lnDataDTO); } + + @Override + public void handleDevRunflag(DevComFlagDTO devComFlagDTO) { + PqsCommunicateDto pqsCommunicateDto = new PqsCommunicateDto(); + pqsCommunicateDto.setTime(LocalDateTimeUtil.format(devComFlagDTO.getDate(), DatePattern.NORM_DATETIME_PATTERN)); + pqsCommunicateDto.setDevId(devComFlagDTO.getId()); + pqsCommunicateDto.setType(devComFlagDTO.getStatus()); + + pqsCommunicateFeignClient.insertion(pqsCommunicateDto); + } } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/PqsCommunicateFeignClient.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/PqsCommunicateFeignClient.java index 3bc21b8..1ed251d 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/PqsCommunicateFeignClient.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/PqsCommunicateFeignClient.java @@ -5,6 +5,7 @@ import com.njcn.common.pojo.response.HttpResult; import com.njcn.dataProcess.api.fallback.DataInharmVFeignClientFallbackFactory; import com.njcn.dataProcess.param.LineCountEvaluateParam; import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto; +import io.swagger.annotations.ApiOperation; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -44,4 +45,8 @@ public interface PqsCommunicateFeignClient { @PostMapping("/getRawDataEnd") HttpResult> getRawDataEnd(@RequestBody LineCountEvaluateParam lineParam); + @PostMapping("/insertion") + @ApiOperation("插入数据") + HttpResult insertion(@RequestBody PqsCommunicateDto pqsCommunicateDto); + } diff --git a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/PqsCommunicateFeignClientFallbackFactory.java b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/PqsCommunicateFeignClientFallbackFactory.java index f36afbd..5c0ca5d 100644 --- a/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/PqsCommunicateFeignClientFallbackFactory.java +++ b/data-processing/data-processing-api/src/main/java/com/njcn/dataProcess/api/fallback/PqsCommunicateFeignClientFallbackFactory.java @@ -57,6 +57,12 @@ public class PqsCommunicateFeignClientFallbackFactory implements FallbackFactory log.error("{}异常,降级处理,异常为:{}","获取是否有当天最后一条数据",cause.toString()); throw new BusinessException(finalExceptionEnum); } + + @Override + public HttpResult insertion(PqsCommunicateDto pqsCommunicateDto) { + log.error("{}异常,降级处理,异常为:{}","通讯表入库",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/data-processing/data-processing-boot/pom.xml b/data-processing/data-processing-boot/pom.xml index 19945f2..461a696 100644 --- a/data-processing/data-processing-boot/pom.xml +++ b/data-processing/data-processing-boot/pom.xml @@ -62,6 +62,17 @@ 1.0.0 compile + + com.njcn + pq-device-api + ${project.version} + + + pqs-influx + com.njcn + + + diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java index 17170ca..767c1b9 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/PqsCommunicateController.java @@ -71,4 +71,13 @@ public class PqsCommunicateController extends BaseController { return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, rawData, methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/insertion") + @ApiOperation("插入数据") + public HttpResult insertion(@RequestBody PqsCommunicateDto pqsCommunicateDto) { + String methodDescribe = getMethodDescribe("insertion"); + pqsCommunicateInsert.insertion(pqsCommunicateDto); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IPqsCommunicate.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IPqsCommunicate.java index 287daed..3f3df28 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IPqsCommunicate.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/IPqsCommunicate.java @@ -34,4 +34,5 @@ public interface IPqsCommunicate { */ List getRawDataEnd(LineCountEvaluateParam lineParam); + void insertion(PqsCommunicateDto pqsCommunicateDto); } diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java index 86ceca5..0f2967c 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/influxdb/InfluxdbPqsCommunicateImpl.java @@ -2,21 +2,30 @@ package com.njcn.dataProcess.service.impl.influxdb; import com.njcn.dataProcess.dao.imapper.PqsCommunicateMapper; import com.njcn.dataProcess.param.LineCountEvaluateParam; +import com.njcn.dataProcess.po.influx.DataHarmrateV; import com.njcn.dataProcess.po.influx.DataV; import com.njcn.dataProcess.po.influx.PqsCommunicate; +import com.njcn.dataProcess.pojo.dto.DataHarmDto; import com.njcn.dataProcess.pojo.dto.PqsCommunicateDto; import com.njcn.dataProcess.service.IPqsCommunicate; +import com.njcn.device.pq.api.DeviceFeignClient; +import com.njcn.device.pq.pojo.dto.DevComFlagDTO; import com.njcn.influx.query.InfluxQueryWrapper; import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; import javax.annotation.Resource; +import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; /** @@ -31,7 +40,8 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { @Resource private PqsCommunicateMapper pqsCommunicateMapper; private final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()); - + @Autowired + private DeviceFeignClient deviceFeignClient; @Override public List getRawDataLatest(LineCountEvaluateParam lineParam) { List result = new ArrayList<>(); @@ -96,6 +106,36 @@ public class InfluxdbPqsCommunicateImpl implements IPqsCommunicate { return result; } + @Override + public void insertion(PqsCommunicateDto pqsCommunicateDto) { + //获取最新一条数据 + PqsCommunicate dto = new PqsCommunicate(); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(PqsCommunicate.class); + influxQueryWrapper.eq(PqsCommunicate::getDevId,pqsCommunicateDto.getDevId()).timeDesc().limit(1); + List pqsCommunicates = pqsCommunicateMapper.selectByQueryWrapper(influxQueryWrapper); + + PqsCommunicate pqsCommunicate = new PqsCommunicate(); + pqsCommunicate.setTime(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER).atZone(ZoneId.systemDefault()).toInstant()); + pqsCommunicate.setDevId(pqsCommunicateDto.getDevId()); + pqsCommunicate.setType(pqsCommunicateDto.getType()); + //如果不存数据或者状态不一样则插入数据 + if(CollectionUtils.isEmpty(pqsCommunicates)|| !Objects.equals( pqsCommunicates.get(0).getType(),pqsCommunicateDto.getType())){ + pqsCommunicateMapper.insertOne(pqsCommunicate); + //更新mysql数据 + DevComFlagDTO devComFlagDTO = new DevComFlagDTO(); + devComFlagDTO.setId(pqsCommunicateDto.getDevId()); + devComFlagDTO.setDate(LocalDateTime.parse(pqsCommunicateDto.getTime(), DATE_TIME_FORMATTER)); + devComFlagDTO.setStatus(pqsCommunicateDto.getType()); + + deviceFeignClient.updateDevComFlag(devComFlagDTO); + } + + + + + } + + /** * 按监测点集合、时间条件获取dataV分钟数据 * timeMap参数来判断是否进行数据出来 timeMap为空则不进行数据处理 diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java index 11088eb..4a2b83c 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/service/impl/relation/RelationPqsCommunicateImpl.java @@ -33,4 +33,9 @@ public class RelationPqsCommunicateImpl implements IPqsCommunicate { public List getRawDataEnd(LineCountEvaluateParam lineParam) { return Collections.emptyList(); } + + @Override + public void insertion(PqsCommunicateDto pqsCommunicateDto) { + + } } diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java b/message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java new file mode 100644 index 0000000..c30ba4a --- /dev/null +++ b/message/message-api/src/main/java/com/njcn/message/messagedto/DevComFlagDTO.java @@ -0,0 +1,26 @@ +package com.njcn.message.messagedto; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.Data; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * Description: + * Date: 2024/11/5 15:07【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class DevComFlagDTO extends BaseMessage implements Serializable { + + private String id; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime date; + private Integer status; + + +} diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java new file mode 100644 index 0000000..74325a9 --- /dev/null +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java @@ -0,0 +1,143 @@ +package com.njcn.message.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.message.constant.MessageStatus; +import com.njcn.message.messagedto.DevComFlagDTO; +import com.njcn.message.messagedto.MessageDataDTO; +import com.njcn.message.websocket.WebSocketServer; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.api.MessAnalysisFeignClient; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * Description: + * Date: 2024/12/13 10:06【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Component +@RocketMQMessageListener( + topic = "Device_Run_Flag_Topic", + consumerGroup = "Device_Run_Flag_Consumer", + selectorExpression = "Test_Tag||Test_Keys", + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Autowired + private MessAnalysisFeignClient messAnalysisFeignClient; + @Override + public void onMessage(String message) { + DevComFlagDTO devComFlagDTO = JSONObject.parseObject(message,DevComFlagDTO.class); + super.dispatchMessage(devComFlagDTO); + + } + + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(DevComFlagDTO message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); + return false; + } + return true; + } + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(DevComFlagDTO message) { + redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + + @Override + protected void handleMessage(DevComFlagDTO message) { + messAnalysisFeignClient.handleDevRunflag(message); + } + + + + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(DevComFlagDTO message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + + +}