设置日志等topic不做重复消费炒作

This commit is contained in:
hzj
2025-08-06 13:25:26 +08:00
parent 030ecc441f
commit 4b9ce14516
6 changed files with 25 additions and 25 deletions

View File

@@ -76,7 +76,7 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler<Dev
*/ */
@Override @Override
protected void consumeSuccess(DevComFlagDTO message) { protected void consumeSuccess(DevComFlagDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
} }

View File

@@ -94,11 +94,11 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
return true; return true;
} }
/** /**
* 消费成功缓存到redis72小时,避免重复消费 * 消费成功缓存到redis5分钟,避免重复消费
*/ */
@Override @Override
protected void consumeSuccess(MessageDataDTO message) { protected void consumeSuccess(MessageDataDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HARMMONIC_TOPIC.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HARMMONIC_TOPIC.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
} }

View File

@@ -55,7 +55,7 @@ public class FrontHeartBeatConsumer extends EnhanceConsumerMessageHandler<FrontH
} }
//本消息不需要控制重复消费
/*** /***
* 通过redis分布式锁判断当前消息所处状态 * 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行 * 1、null 查不到该key的数据属于第一次消费放行
@@ -65,11 +65,11 @@ public class FrontHeartBeatConsumer extends EnhanceConsumerMessageHandler<FrontH
*/ */
@Override @Override
public boolean filter(FrontHeartBeatDTO message) { public boolean filter(FrontHeartBeatDTO message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); // String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { // if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); // redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false; // return false;
} // }
return true; return true;
} }
/** /**
@@ -77,7 +77,7 @@ public class FrontHeartBeatConsumer extends EnhanceConsumerMessageHandler<FrontH
*/ */
@Override @Override
protected void consumeSuccess(FrontHeartBeatDTO message) { protected void consumeSuccess(FrontHeartBeatDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); // redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HEART_BEAT.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
} }

View File

@@ -70,11 +70,11 @@ public class RealTimeDataConsumer extends EnhanceConsumerMessageHandler<MessageD
return true; return true;
} }
/** /**
* 消费成功缓存到redis72小时,避免重复消费 * 消费成功缓存到redis5分钟,避免重复消费
*/ */
@Override @Override
protected void consumeSuccess(MessageDataDTO message) { protected void consumeSuccess(MessageDataDTO message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, 5*60L);
} }

View File

@@ -65,19 +65,19 @@ public class TopicLogsConsumer extends EnhanceConsumerMessageHandler<FrontLogslM
*/ */
@Override @Override
public boolean filter(FrontLogslMessage message) { public boolean filter(FrontLogslMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); // String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { // if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); // redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
// return false;
// }
return false; return false;
} }
return true;
}
/** /**
* 消费成功缓存到redis72小时避免重复消费 * 消费成功缓存到redis72小时避免重复消费
*/ */
@Override @Override
protected void consumeSuccess(FrontLogslMessage message) { protected void consumeSuccess(FrontLogslMessage message) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime()); // redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
} }

View File

@@ -64,13 +64,13 @@ public class TopicReplyConsumer extends EnhanceConsumerMessageHandler<TopicReply
*/ */
@Override @Override
public boolean filter(TopicReplyDTO message) { public boolean filter(TopicReplyDTO message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey())); // String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) { // if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L); // redisUtil.saveByKeyWithExpire(RedisKeyPrefix.TOPIC_REPLY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
// return false;
// }
return false; return false;
} }
return true;
}
/** /**
* 消费成功缓存到redis72小时避免重复消费 * 消费成功缓存到redis72小时避免重复消费
*/ */