1、微调
This commit is contained in:
@@ -7,6 +7,7 @@ import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@@ -38,7 +39,7 @@ public class MessageRetryRedisDAO {
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public void addToRetryQueue(MessageRecordDO message) {
|
||||
public boolean addToRetryQueue(MessageRecordDO message) {
|
||||
String key = getRetryQueueKey(message.getChannel());
|
||||
double score = message.getNextRetryTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
||||
|
||||
@@ -46,9 +47,12 @@ public class MessageRetryRedisDAO {
|
||||
redisTemplate.opsForZSet().add(key, message.getMessageId(), score);
|
||||
log.debug("添加消息到重试队列成功: channel={}, messageId={}, nextRetryTime={}",
|
||||
message.getChannel(), message.getMessageId(), message.getNextRetryTime());
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("添加消息到重试队列失败: channel={}, messageId={}", message.getChannel(), message.getMessageId(), e);
|
||||
throw e;
|
||||
log.warn("Redis 入队失败,当前仅保留数据库重试记录等待后续恢复: channel={}, messageId={}",
|
||||
message.getChannel(), message.getMessageId());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,14 +62,17 @@ public class MessageRetryRedisDAO {
|
||||
* @param channel 渠道类型
|
||||
* @param messageId 消息ID
|
||||
*/
|
||||
public void removeFromRetryQueue(String channel, String messageId) {
|
||||
public boolean removeFromRetryQueue(String channel, String messageId) {
|
||||
String key = getRetryQueueKey(channel);
|
||||
|
||||
try {
|
||||
redisTemplate.opsForZSet().remove(key, messageId);
|
||||
log.debug("从重试队列移除消息成功: channel={}, messageId={}", channel, messageId);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("从重试队列移除消息失败: channel={}, messageId={}", channel, messageId, e);
|
||||
log.warn("Redis 出队失败,当前消息可能存在待清理的 Redis 脏数据: channel={}, messageId={}", channel, messageId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +91,7 @@ public class MessageRetryRedisDAO {
|
||||
return redisTemplate.opsForZSet().rangeByScore(key, 0, currentTime, 0, limit);
|
||||
} catch (Exception e) {
|
||||
log.error("获取需要重试的消息ID失败: channel={}", channel, e);
|
||||
return null;
|
||||
return Collections.emptySet();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProvide
|
||||
public ChannelProviderConfigDO toggleEnableField(String id) {
|
||||
ChannelProviderConfigDO channelProviderConfigDO = this.getById(id);
|
||||
channelProviderConfigDO.setEnabled(channelProviderConfigDO.getEnabled() ^ 0X0001);
|
||||
this.updateById(channelProviderConfigDO);
|
||||
return channelProviderConfigDO;
|
||||
}
|
||||
|
||||
|
||||
@@ -233,7 +233,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
||||
|
||||
@Override
|
||||
public MessageRecordDO getById(String messageId) {
|
||||
return this.lambdaQuery().eq(MessageRecordDO::getMessageId, messageId).eq(MessageRecordDO::getDeleted, true).one();
|
||||
return this.lambdaQuery().eq(MessageRecordDO::getMessageId, messageId).one();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -265,6 +265,6 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
|
||||
|
||||
@Override
|
||||
public boolean delete(List<Long> ids) {
|
||||
return this.lambdaUpdate().in(MessageRecordDO::getId, ids).set(MessageRecordDO::getDeleted, false).update();
|
||||
return this.removeByIds(ids);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ public class RateLimitConfigServiceImpl extends ServiceImpl<RateLimitConfigMappe
|
||||
Integer dailyQuota = rateLimitConfigDO.getDailyLimit();
|
||||
Integer count = rateLimitRedisDAO.get(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver());
|
||||
if (count >= dailyQuota) {
|
||||
messageRecordDO.setStatus(MsgStatusConstant.QUOTAEXCEEDED);
|
||||
messageRecordDO.setStatus(MsgStatusConstant.RATE_LIMITED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.njcn.msgpush.module.push.service.retry;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.njcn.msgpush.framework.common.pojo.PageResult;
|
||||
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
|
||||
@@ -63,7 +64,9 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
||||
retryRecord.setNextRetryTime(nextRetryTime);
|
||||
message.setNextRetryTime(nextRetryTime);
|
||||
this.save(retryRecord);
|
||||
messageRetryRedisDAO.addToRetryQueue(message);
|
||||
if (!messageRetryRedisDAO.addToRetryQueue(message)) {
|
||||
log.warn("消息已写入数据库重试队列,但 Redis 入队失败: messageId={}", message.getMessageId());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -86,7 +89,9 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
||||
existing.setNextRetryTime(nextRetryTime);
|
||||
message.setNextRetryTime(nextRetryTime);
|
||||
this.updateById(existing);
|
||||
messageRetryRedisDAO.addToRetryQueue(message);
|
||||
if (!messageRetryRedisDAO.addToRetryQueue(message)) {
|
||||
log.warn("消息已更新数据库重试队列,但 Redis 入队失败: messageId={}", message.getMessageId());
|
||||
}
|
||||
}
|
||||
|
||||
private LocalDateTime calculateNextRetryTime(String channel, int retryCount) {
|
||||
@@ -163,7 +168,15 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
|
||||
|
||||
@Override
|
||||
public PageResult<MessageRetryQueueDO> getPage(MessageRetryQueueReqVO reqVO) {
|
||||
return null;
|
||||
LambdaQueryWrapper<MessageRetryQueueDO> queryWrapper = new LambdaQueryWrapper<MessageRetryQueueDO>()
|
||||
.eq(StrUtil.isNotBlank(reqVO.getMessageId()), MessageRetryQueueDO::getMessageId, reqVO.getMessageId())
|
||||
.eq(StrUtil.isNotBlank(reqVO.getChannel()), MessageRetryQueueDO::getChannel, reqVO.getChannel())
|
||||
.eq(StrUtil.isNotBlank(reqVO.getReceiver()), MessageRetryQueueDO::getReceiver, reqVO.getReceiver())
|
||||
.ge(ObjectUtil.isNotNull(reqVO.getMinRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMinRetryCount())
|
||||
.le(ObjectUtil.isNotNull(reqVO.getMaxRetryCount()), MessageRetryQueueDO::getRetryCount, reqVO.getMaxRetryCount())
|
||||
.orderByAsc(MessageRetryQueueDO::getNextRetryTime)
|
||||
.orderByDesc(MessageRetryQueueDO::getId);
|
||||
return this.baseMapper.selectPage(reqVO, queryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user