diff --git a/message/message-api/src/main/java/com/njcn/message/api/ProduceFeignClient.java b/message/message-api/src/main/java/com/njcn/message/api/ProduceFeignClient.java index a306780..51f7755 100644 --- a/message/message-api/src/main/java/com/njcn/message/api/ProduceFeignClient.java +++ b/message/message-api/src/main/java/com/njcn/message/api/ProduceFeignClient.java @@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.message.api.fallback.ProduceFeignClientFallbackFactory; +import com.njcn.message.message.AskFileSysMessage; import com.njcn.message.message.DeviceRebootMessage; import com.njcn.message.message.ProcessRebootMessage; import com.njcn.message.message.RecallMessage; @@ -38,4 +39,7 @@ public interface ProduceFeignClient { public HttpResult askRestartProcess(@RequestBody ProcessRebootMessage message); + @PostMapping("/askFileSys") + public HttpResult askFileSys(@RequestBody AskFileSysMessage message); + } diff --git a/message/message-api/src/main/java/com/njcn/message/api/fallback/ProduceFeignClientFallbackFactory.java b/message/message-api/src/main/java/com/njcn/message/api/fallback/ProduceFeignClientFallbackFactory.java index f0fc937..c80749b 100644 --- a/message/message-api/src/main/java/com/njcn/message/api/fallback/ProduceFeignClientFallbackFactory.java +++ b/message/message-api/src/main/java/com/njcn/message/api/fallback/ProduceFeignClientFallbackFactory.java @@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.response.HttpResult; import com.njcn.message.api.ProduceFeignClient; +import com.njcn.message.message.AskFileSysMessage; import com.njcn.message.message.DeviceRebootMessage; import com.njcn.message.message.ProcessRebootMessage; import com.njcn.message.message.RecallMessage; @@ -53,6 +54,12 @@ public class ProduceFeignClientFallbackFactory implements FallbackFactory askFileSys(AskFileSysMessage message) { + log.error("{}异常,降级处理,异常为:{}", "请求装置文件系统", throwable.toString()); + throw new BusinessException(finalExceptionEnum); + } }; } } diff --git a/message/message-api/src/main/java/com/njcn/message/constant/BusinessTopic.java b/message/message-api/src/main/java/com/njcn/message/constant/BusinessTopic.java index 7346c63..1b33df0 100644 --- a/message/message-api/src/main/java/com/njcn/message/constant/BusinessTopic.java +++ b/message/message-api/src/main/java/com/njcn/message/constant/BusinessTopic.java @@ -37,6 +37,9 @@ public interface BusinessTopic { String ASK_REAL_DATA_TOPIC = "ask_real_data_topic"; String RECALL_TOPIC = "recall_Topic"; + String FILE_TOPIC = "File_Topic"; + + /********************************数据中心*********************************/ diff --git a/message/message-api/src/main/java/com/njcn/message/message/AskFileSysMessage.java b/message/message-api/src/main/java/com/njcn/message/message/AskFileSysMessage.java new file mode 100644 index 0000000..1efdb9e --- /dev/null +++ b/message/message-api/src/main/java/com/njcn/message/message/AskFileSysMessage.java @@ -0,0 +1,36 @@ +package com.njcn.message.message; + +import lombok.Data; + +/** + * Description: + * Date: 2024/12/13 10:15【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class AskFileSysMessage { + + + /** + * guid : sdghsfdhfdhdfhdfghd234234534534 + * frontid : dhdfhdfghd2342 + * ProcessNo : 1 + * devid : 167456737637374567 + * type : 2 + * Path : /remote/vol1_stat.txt + * DevPath : /etc/vol1_stat.txt + */ + + private String guid; + private String nodeId; + private int processNo; + private String devId; + private int type; + private String path; + private String remotePath; + + +} + diff --git a/message/message-api/src/main/java/com/njcn/message/messagedto/FileSysDTO.java b/message/message-api/src/main/java/com/njcn/message/messagedto/FileSysDTO.java new file mode 100644 index 0000000..90ee48a --- /dev/null +++ b/message/message-api/src/main/java/com/njcn/message/messagedto/FileSysDTO.java @@ -0,0 +1,26 @@ +package com.njcn.message.messagedto; + +import com.njcn.middle.rocket.domain.BaseMessage; +import lombok.Data; + +import java.io.Serializable; + +/** + * Description: + * Date: 2026/04/29 下午 3:16【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class FileSysDTO extends BaseMessage implements Serializable { + private String guid; + private String nodeId; + private int processNo; + private String devId; + private int type; + private String path; + private String devPath; + private int result; + private T detail; +} diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/FileSysDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/FileSysDataConsumer.java new file mode 100644 index 0000000..72397e5 --- /dev/null +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/FileSysDataConsumer.java @@ -0,0 +1,151 @@ +package com.njcn.message.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.njcn.message.constant.MessageStatus; +import com.njcn.message.constant.RedisKeyPrefix; +import com.njcn.message.messagedto.FileSysDTO; +import com.njcn.message.messagedto.MessageDataDTO; +import com.njcn.message.websocket.WebSocketServer; +import com.njcn.middle.rocket.constant.EnhanceMessageConstant; +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.api.RocketMqLogFeignClient; +import com.njcn.system.pojo.po.RocketmqMsgErrorLog; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * Description: + * Date: 2024/12/13 10:06【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Component +@RocketMQMessageListener( + topic = "File_Reply_Topic", + consumerGroup = "file_sys_consumer", + consumeThreadNumber = 10, + enableMsgTrace = true +) +@Slf4j +public class FileSysDataConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RedisUtil redisUtil; + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private RocketMqLogFeignClient rocketMqLogFeignClient; + @Override + public void onMessage(String message) { + + FileSysDTO messageDataDTO = JSONObject.parseObject(message, FileSysDTO.class); + super.dispatchMessage(messageDataDTO); + + } + + + + /*** + * 通过redis分布式锁判断当前消息所处状态 + * 1、null 查不到该key的数据,属于第一次消费,放行 + * 2、fail 上次消息消费时发生异常,放行 + * 3、being processed 正在处理,打回去 + * 4、success 最近72小时消费成功,避免重复消费,打回去 + */ + @Override + public boolean filter(FileSysDTO message) { + String keyStatus = redisUtil.getStringByKey(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey())); + if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { + + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); + return false; + } + return true; + } + /** + * 消费成功,缓存到redis5分钟,避免重复消费 + */ + @Override + protected void consumeSuccess(FileSysDTO message) { +// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L); + } + + + @Override + protected void handleMessage(FileSysDTO message) { + + String msgId = message.getGuid(); + // 1. 将结果存入 Redis,过期时间 60 秒(足够完成请求) + String key = "pending:" + msgId; + redisUtil.saveByKeyWithExpire(key, message, 60L); + // 2. 发布 Redis 通知,让等待的实例知道结果已就绪 + stringRedisTemplate.convertAndSend("result_ready_channel", msgId); + } + + + + + /** + * 发生异常时,进行错误信息入库保存 + * 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存 + */ + @Override + protected void saveExceptionMsgLog(FileSysDTO message, String identity, Exception exception) { + redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); + RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog(); + rocketmqMsgErrorLog.setMsgKey(message.getKey()); + rocketmqMsgErrorLog.setResource(message.getSource()); + if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) { + //数据库字段配置长度200,避免插入失败,大致分析异常原因 + String exceptionMsg = exception.getMessage(); + if(exceptionMsg.length() > 200){ + exceptionMsg = exceptionMsg.substring(0,180); + } + rocketmqMsgErrorLog.setRecord(exceptionMsg); + //如果是当前消息重试的则略过 + if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ + //单次消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } else { + rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。"); + //重试N次后,依然消费异常 + rocketMqLogFeignClient.add(rocketmqMsgErrorLog); + } + } + + + /*** + * 处理失败后,是否重试 + * 一般开启 + */ + @Override + protected boolean isRetry() { + return true; + } + + + /*** + * 消费失败是否抛出异常,抛出异常后就不再消费了 + */ + @Override + protected boolean throwException() { + return false; + } + + + + +} diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java b/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java index eb80028..f9a768c 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/ProduceController.java @@ -7,9 +7,11 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; +import com.njcn.message.message.AskFileSysMessage; import com.njcn.message.message.DeviceRebootMessage; import com.njcn.message.message.ProcessRebootMessage; import com.njcn.message.message.RecallMessage; +import com.njcn.message.produce.template.AskFileSysMessaggeTemplate; import com.njcn.message.produce.template.DeviceRebootMessageTemplate; import com.njcn.message.produce.template.ProcessRebootMessageTemplate; import com.njcn.message.produce.template.RecallMessaggeTemplate; @@ -40,6 +42,7 @@ public class ProduceController extends BaseController { private final ProcessRebootMessageTemplate processRebootMessageTemplate; + private final AskFileSysMessaggeTemplate askFileSysMessaggeTemplate; @PostMapping("/recall") @OperateInfo(info = LogEnum.BUSINESS_COMMON) @@ -76,4 +79,14 @@ public class ProduceController extends BaseController { processRebootMessageTemplate.sendMember(baseMessage,message.getNodeId()); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } + + @PostMapping("/askFileSys") + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("请求文件系统") + @ApiImplicitParam(name = "message", value = "参数", required = true) + public HttpResult askFileSys(@RequestBody AskFileSysMessage message){ + String methodDescribe = getMethodDescribe("askFileSys"); + SendResult sendResult = askFileSysMessaggeTemplate.sendMember(message); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS,message.getGuid() , methodDescribe); + } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/AskFileSysMessaggeTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskFileSysMessaggeTemplate.java new file mode 100644 index 0000000..0139672 --- /dev/null +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskFileSysMessaggeTemplate.java @@ -0,0 +1,34 @@ +package com.njcn.message.produce.template; + + +import com.alibaba.fastjson.JSONObject; +import com.njcn.message.constant.BusinessResource; +import com.njcn.message.constant.BusinessTopic; +import com.njcn.message.message.AskFileSysMessage; +import com.njcn.middle.rocket.domain.BaseMessage; +import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Component; + +/** + * Description: + * Date: 2024/12/13 15:15【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Component +public class AskFileSysMessaggeTemplate extends RocketMQEnhanceTemplate { + public AskFileSysMessaggeTemplate(RocketMQTemplate template) { + super(template); + } + + public SendResult sendMember(AskFileSysMessage askRealDataMessage) { + BaseMessage baseMessage = new BaseMessage(); + baseMessage.setMessageBody(JSONObject.toJSONString(askRealDataMessage)); + baseMessage.setSource(BusinessResource.WEB_RESOURCE); + baseMessage.setKey(askRealDataMessage.getGuid()); + return send(BusinessTopic.FILE_TOPIC,askRealDataMessage.getNodeId() , baseMessage); + } +}