diff --git a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java index 4934635..b3b49c8 100644 --- a/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java +++ b/data-processing/data-processing-boot/src/main/java/com/njcn/dataProcess/controller/DataRecallController.java @@ -3,7 +3,6 @@ package com.njcn.dataProcess.controller;//package com.njcn.message.websocket; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.IdUtil; -import com.alibaba.fastjson.JSONObject; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum; @@ -11,7 +10,6 @@ import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; import com.njcn.dataProcess.annotation.QueryBean; -import com.njcn.dataProcess.api.DataVFeignClient; import com.njcn.dataProcess.service.IDataV; import com.njcn.device.biz.commApi.CommTerminalGeneralClient; import com.njcn.device.biz.pojo.dto.LineDevGetDTO; @@ -19,7 +17,6 @@ import com.njcn.device.pq.api.DeviceFeignClient; import com.njcn.device.pq.pojo.dto.DeviceDTO; import com.njcn.message.api.ProduceFeignClient; import com.njcn.message.message.RecallMessage; -import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.web.controller.BaseController; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; @@ -38,7 +35,6 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/FrontLogslMessage.java b/message/message-api/src/main/java/com/njcn/message/messagedto/FrontLogslMessage.java new file mode 100644 index 0000000..3a0753a --- /dev/null +++ b/message/message-api/src/main/java/com/njcn/message/messagedto/FrontLogslMessage.java @@ -0,0 +1,25 @@ +package com.njcn.message.messagedto; + +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.Data; + +import java.io.Serializable; + +/** + * Description: + * Date: 2024/12/13 10:15【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class FrontLogslMessage extends BaseMessage implements Serializable { + private String nodeId; + private String processNo; + private String businessId; + private String level; + private String logType; + private String grade; + private String frontType; + private String log; +} diff --git a/message/message-boot/pom.xml b/message/message-boot/pom.xml index b7676be..a5e3fc3 100644 --- a/message/message-boot/pom.xml +++ b/message/message-boot/pom.xml @@ -75,6 +75,11 @@ rt-api 1.0.0 + + com.njcn + system-api + 1.0.0 + com.njcn pq-device-api diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java new file mode 100644 index 0000000..4843988 --- /dev/null +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java @@ -0,0 +1,150 @@ +package com.njcn.message.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.message.constant.MessageStatus; +import com.njcn.message.constant.RedisKeyPrefix; +import com.njcn.message.messagedto.FrontLogslMessage; +import com.njcn.message.messagedto.TopicReplyDTO; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.redis.pojo.enums.AppRedisKey; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.api.MessAnalysisFeignClient; +import com.njcn.system.api.FrontLogsFeignClient; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.dto.PqFrontLogsDTO; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Objects; + +/** + * Description: + * Date: 2024/12/13 10:06【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Component +@RocketMQMessageListener( + topic = "log_Topic", + consumerGroup = "Log_Topic_Consumer", + selectorExpression = "Test_Tag||Test_Keys", + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class TopicLogsConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private FrontLogsFeignClient frontLogsFeignClient; + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + + @Override + public void onMessage(String message) { + FrontLogslMessage frontLogslMessage = JSONObject.parseObject(message,FrontLogslMessage.class); + super.dispatchMessage(frontLogslMessage); + + } + + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(FrontLogslMessage message) { + String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); + return false; + } + return true; + } + /** + * 消费成功,缓存到redis72小时,避免重复消费 + */ + @Override + protected void consumeSuccess(FrontLogslMessage message) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + } + + + @Override + protected void handleMessage(FrontLogslMessage message) { + + //业务处理 + PqFrontLogsDTO pqFrontLogsDTO = new PqFrontLogsDTO(); + BeanUtils.copyProperties(message,pqFrontLogsDTO); + frontLogsFeignClient.addFrontLogs(pqFrontLogsDTO); + } + + + + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(FrontLogslMessage message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + + +}