Merge pull request 'master-temp' (#2) from master-temp into master-jb
Reviewed-on: #2
This commit is contained in:
@@ -0,0 +1,151 @@
|
||||
package com.njcn.message.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.njcn.message.constant.MessageStatus;
|
||||
import com.njcn.message.constant.RedisKeyPrefix;
|
||||
import com.njcn.message.messagedto.FileSysDTO;
|
||||
import com.njcn.message.messagedto.MessageDataDTO;
|
||||
import com.njcn.message.websocket.WebSocketServer;
|
||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Description:
|
||||
* Date: 2024/12/13 10:06【需求编号】
|
||||
*
|
||||
* @author clam
|
||||
* @version V1.0.0
|
||||
*/
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "File_Reply_Topic",
|
||||
consumerGroup = "file_sys_consumer",
|
||||
consumeThreadNumber = 10,
|
||||
enableMsgTrace = true
|
||||
)
|
||||
@Slf4j
|
||||
public class FileSysDataConsumer extends EnhanceConsumerMessageHandler<FileSysDTO> implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
|
||||
FileSysDTO messageDataDTO = JSONObject.parseObject(message, FileSysDTO.class);
|
||||
super.dispatchMessage(messageDataDTO);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/***
|
||||
* 通过redis分布式锁判断当前消息所处状态
|
||||
* 1、null 查不到该key的数据,属于第一次消费,放行
|
||||
* 2、fail 上次消息消费时发生异常,放行
|
||||
* 3、being processed 正在处理,打回去
|
||||
* 4、success 最近72小时消费成功,避免重复消费,打回去
|
||||
*/
|
||||
@Override
|
||||
public boolean filter(FileSysDTO message) {
|
||||
String keyStatus = redisUtil.getStringByKey(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()));
|
||||
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* 消费成功,缓存到redis5分钟,避免重复消费
|
||||
*/
|
||||
@Override
|
||||
protected void consumeSuccess(FileSysDTO message) {
|
||||
// redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void handleMessage(FileSysDTO message) {
|
||||
|
||||
String msgId = message.getGuid();
|
||||
// 1. 将结果存入 Redis,过期时间 60 秒(足够完成请求)
|
||||
String key = "pending:" + msgId;
|
||||
redisUtil.saveByKeyWithExpire(key, message, 60L);
|
||||
// 2. 发布 Redis 通知,让等待的实例知道结果已就绪
|
||||
stringRedisTemplate.convertAndSend("result_ready_channel", msgId);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 发生异常时,进行错误信息入库保存
|
||||
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||
*/
|
||||
@Override
|
||||
protected void saveExceptionMsgLog(FileSysDTO message, String identity, Exception exception) {
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
|
||||
//数据库字段配置长度200,避免插入失败,大致分析异常原因
|
||||
String exceptionMsg = exception.getMessage();
|
||||
if(exceptionMsg.length() > 200){
|
||||
exceptionMsg = exceptionMsg.substring(0,180);
|
||||
}
|
||||
rocketmqMsgErrorLog.setRecord(exceptionMsg);
|
||||
//如果是当前消息重试的则略过
|
||||
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
|
||||
//单次消费异常
|
||||
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||
}
|
||||
} else {
|
||||
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
|
||||
//重试N次后,依然消费异常
|
||||
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 处理失败后,是否重试
|
||||
* 一般开启
|
||||
*/
|
||||
@Override
|
||||
protected boolean isRetry() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 消费失败是否抛出异常,抛出异常后就不再消费了
|
||||
*/
|
||||
@Override
|
||||
protected boolean throwException() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -7,9 +7,11 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.common.utils.HttpResultUtil;
|
||||
|
||||
import com.njcn.message.message.AskFileSysMessage;
|
||||
import com.njcn.message.message.DeviceRebootMessage;
|
||||
import com.njcn.message.message.ProcessRebootMessage;
|
||||
import com.njcn.message.message.RecallMessage;
|
||||
import com.njcn.message.produce.template.AskFileSysMessaggeTemplate;
|
||||
import com.njcn.message.produce.template.DeviceRebootMessageTemplate;
|
||||
import com.njcn.message.produce.template.ProcessRebootMessageTemplate;
|
||||
import com.njcn.message.produce.template.RecallMessaggeTemplate;
|
||||
@@ -40,6 +42,7 @@ public class ProduceController extends BaseController {
|
||||
|
||||
private final ProcessRebootMessageTemplate processRebootMessageTemplate;
|
||||
|
||||
private final AskFileSysMessaggeTemplate askFileSysMessaggeTemplate;
|
||||
|
||||
@PostMapping("/recall")
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@@ -76,4 +79,14 @@ public class ProduceController extends BaseController {
|
||||
processRebootMessageTemplate.sendMember(baseMessage,message.getNodeId());
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
@PostMapping("/askFileSys")
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@ApiOperation("请求文件系统")
|
||||
@ApiImplicitParam(name = "message", value = "参数", required = true)
|
||||
public HttpResult<String> askFileSys(@RequestBody AskFileSysMessage message){
|
||||
String methodDescribe = getMethodDescribe("askFileSys");
|
||||
SendResult sendResult = askFileSysMessaggeTemplate.sendMember(message);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS,message.getGuid() , methodDescribe);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.njcn.message.produce.template;
|
||||
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.njcn.message.constant.BusinessResource;
|
||||
import com.njcn.message.constant.BusinessTopic;
|
||||
import com.njcn.message.message.AskFileSysMessage;
|
||||
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Description:
|
||||
* Date: 2024/12/13 15:15【需求编号】
|
||||
*
|
||||
* @author clam
|
||||
* @version V1.0.0
|
||||
*/
|
||||
@Component
|
||||
public class AskFileSysMessaggeTemplate extends RocketMQEnhanceTemplate {
|
||||
public AskFileSysMessaggeTemplate(RocketMQTemplate template) {
|
||||
super(template);
|
||||
}
|
||||
|
||||
public SendResult sendMember(AskFileSysMessage askRealDataMessage) {
|
||||
BaseMessage baseMessage = new BaseMessage();
|
||||
baseMessage.setMessageBody(JSONObject.toJSONString(askRealDataMessage));
|
||||
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
|
||||
baseMessage.setKey(askRealDataMessage.getGuid());
|
||||
return send(BusinessTopic.FILE_TOPIC,askRealDataMessage.getNodeId() , baseMessage);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user