This commit is contained in:
caozehui
2026-03-30 13:58:03 +08:00
parent be7c1b7b26
commit 87757b352c
12 changed files with 132 additions and 37 deletions

View File

@@ -27,5 +27,6 @@ public class MsgPushGuardChain {
public void checkAll(List<MessageRecordDO> messageRecordList) {
blacklistChecker.check(messageRecordList);
quotaChecker.check(messageRecordList);
rateLimitChecker.check(messageRecordList);
}
}

View File

@@ -4,6 +4,8 @@ package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.service.ratelimit.RateLimitConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@@ -15,8 +17,12 @@ import java.util.List;
*/
@Component
public class RateLimitChecker implements IChecker {
@Autowired
private RateLimitConfigService rateLimitConfigService;
@Override
public void check(List<MessageRecordDO> messageRecordList) {
rateLimitConfigService.check(messageRecordList);
}
}

View File

@@ -25,6 +25,7 @@ public class UniPushAppPushSender implements AppPushSender {
private Sender sender;
private ApiHelper apiHelper;
private PushApi pushApi;
public UniPushAppPushSender(UniPushAppPushSetting uniPushAppPushSetting, Sender sender) {
// this.uniPushAppPushSetting = uniPushAppPushSetting;
@@ -36,13 +37,14 @@ public class UniPushAppPushSender implements AppPushSender {
gtApiConfiguration.setMasterSecret(uniPushAppPushSetting.getMasterSecret());
gtApiConfiguration.setDomain("https://restapi.getui.com/v2/");
this.apiHelper = ApiHelper.build(gtApiConfiguration);
this.pushApi = apiHelper.creatApi(PushApi.class);
}
@Override
public boolean appPush(MessageRecordDO message) {
PushDTO<Audience> pushDTO = this.buildPushDTO(message.getTitle(), message.getContent());
// 进行cid单推
PushApi pushApi = apiHelper.creatApi(PushApi.class);
// 设置接收人信息
Audience audience = new Audience();
audience.addCid(message.getReceiver());

View File

@@ -33,7 +33,7 @@ public class MessageRecordController {
@PermitAll
@PostMapping("/send")
@Operation(summary = "消息推送")
@Idempotent(timeout = 60)
@Idempotent(timeout = 2)
public CommonResult<List<MessageSendResultVO>> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
return success(messageRecordService.send(reqVOList));
}

View File

@@ -24,7 +24,7 @@ public class MessageRetryRedisDAO {
/**
* Redis中消息重试队列的Key前缀
*/
private static final String RETRY_QUEUE_KEY_PREFIX = "msgpush:retry_queue:";
private static final String RETRY_QUEUE_KEY_PREFIX = "msPpush:retry_queue:";
/**
* 获取指定渠道的重试队列Key

View File

@@ -0,0 +1,49 @@
package com.njcn.msgpush.module.push.dal.redis;
import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
* @author caozehui
* @data 2026-03-25
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RateLimitRedisDAO {
private final RedisTemplate<String, String> redisTemplate;
private final String RATE_LIMIT_KEY_PREFIX = "msgPush:rate_limit";
private String buildKey(String channel, String appName, String receiver) {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMdd");
return RATE_LIMIT_KEY_PREFIX + channel + ":" + appName + ":" + receiver + ":" + LocalDate.now().format(dtf);
}
public void set(String channel, String appName, String receiver) {
String key = buildKey(channel, appName, receiver);
long now = System.currentTimeMillis();
long tomorrowZero = LocalDate.now().plusDays(1).atStartOfDay().toInstant(ZoneOffset.of("+8")).toEpochMilli();
long ttl = tomorrowZero - now;
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, "1", ttl, TimeUnit.MILLISECONDS);
if (!absent) {
Integer oldCount = this.get(channel, appName, receiver);
redisTemplate.opsForValue().set(key, (oldCount + 1) + "", ttl, TimeUnit.MILLISECONDS);
}
}
public Integer get(String channel, String appName, String receiver) {
String key = buildKey(channel, appName, receiver);
String countStr = redisTemplate.opsForValue().get(key);
return ObjectUtil.isNull(countStr) ? 0 : Integer.parseInt(countStr);
}
}

View File

@@ -6,29 +6,32 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
public class SystemQuotaRedisDAO {
private final RedisTemplate<String, String> redisTemplate;
private static final String QUOTA_KEY_PREFIX = "msgpush:quota:";
private static final String QUOTA_KEY_PREFIX = "msgPush:quota:";
public void set(String channel, String appName, boolean isSchedule) {
private String buildKey(String channel, String appName) {
return QUOTA_KEY_PREFIX + channel + ":" + appName;
}
public void set(String channel, String appName) {
String key = buildKey(channel, appName);
if (isSchedule) {
redisTemplate.opsForValue().set(key, "0");
} else {
String countStr = redisTemplate.opsForValue().get(key);
Integer count = 0;
if (ObjectUtil.isNull(countStr)) {
count = 1;
} else {
count = Integer.parseInt(countStr) + 1;
}
redisTemplate.opsForValue().set(key, String.valueOf(count));
long now = System.currentTimeMillis();
long tomorrowZero = LocalDate.now().plusDays(1).atStartOfDay().toInstant(ZoneOffset.of("+8")).toEpochMilli();
long ttl = tomorrowZero - now;
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key, "1", ttl, TimeUnit.MILLISECONDS);
if (!absent) {
Integer oldCount = this.get(channel, appName);
redisTemplate.opsForValue().set(key, (oldCount + 1) + "", ttl, TimeUnit.MILLISECONDS);
}
}
@@ -38,7 +41,4 @@ public class SystemQuotaRedisDAO {
return ObjectUtil.isNull(countStr) ? 0 : Integer.parseInt(countStr);
}
private String buildKey(String channel, String appName) {
return QUOTA_KEY_PREFIX + channel + ":" + appName;
}
}

View File

@@ -21,26 +21,26 @@ public class MessageRetryJob {
private final MessageRetryQueueService messageRetryQueueService;
/**
* 定时处理短信重试队列(每3秒执行一次)
* 定时处理短信重试队列(每10秒执行一次)
*/
@Scheduled(fixedRate = 3000)
@Scheduled(fixedRate = 10000)
public void processSmsRetryQueue() {
log.info("开始处理短信重试队列:{}", LocalDateTime.now());
messageRetryQueueService.processRetryBatch("sms");
}
/**
* 定时处理邮件重试队列(每3秒执行一次)
* 定时处理邮件重试队列(每10秒执行一次)
*/
@Scheduled(fixedRate = 3000)
@Scheduled(fixedRate = 10000)
public void processEmailRetryQueue() {
messageRetryQueueService.processRetryBatch("email");
}
/**
* 定时处理APP推送重试队列3秒执行一次)
* 定时处理APP推送重试队列10秒执行一次)
*/
@Scheduled(fixedRate = 3000)
@Scheduled(fixedRate = 10000)
public void processAppPushRetryQueue() {
messageRetryQueueService.processRetryBatch("app_push");
}

View File

@@ -18,6 +18,7 @@ import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfig
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryHistoryDO;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO;
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
@@ -52,6 +53,9 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
@Autowired
private SystemQuotaRedisDAO systemQuotaRedisDAO;
@Autowired
private RateLimitRedisDAO rateLimitRedisDAO;
@Override
@Transactional(rollbackFor = Exception.class)
public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList) {
@@ -81,6 +85,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
messageSendResultVO.setDetail("配额超限");
}
resultList.add(messageSendResultVO);
this.updateMessage(messageRecordDO);
continue;
}
@@ -114,7 +119,8 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
// 更新配额
systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(),false);
systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName());
rateLimitRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver());
}
return resultList;
}

View File

@@ -8,7 +8,6 @@ import com.njcn.msgpush.module.push.dal.dataobject.quota.SystemQuotaConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.quota.SystemQuotaConfigMapper;
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -27,6 +26,7 @@ public class SystemQuotaConfigServiceImpl extends ServiceImpl<SystemQuotaConfigM
return this.lambdaQuery()
.eq(SystemQuotaConfigDO::getChannel, channel)
.eq(SystemQuotaConfigDO::getAppName, appName)
.eq(SystemQuotaConfigDO::getEnabled, true)
.eq(SystemQuotaConfigDO::getDeleted, false)
.one();
}
@@ -46,12 +46,4 @@ public class SystemQuotaConfigServiceImpl extends ServiceImpl<SystemQuotaConfigM
}
}
}
@Scheduled(cron = "0 0 0 * * ?")
public void resetDailyQuota() {
List<SystemQuotaConfigDO> list = this.lambdaQuery().eq(SystemQuotaConfigDO::getEnabled, true).eq(SystemQuotaConfigDO::getDeleted, false).list();
for (SystemQuotaConfigDO systemQuotaConfigDO : list) {
systemQuotaRedisDAO.set(systemQuotaConfigDO.getChannel(), systemQuotaConfigDO.getAppName(), true);
}
}
}

View File

@@ -15,5 +15,10 @@ import java.util.List;
public interface RateLimitConfigService extends IService<RateLimitConfigDO> {
Page<RateLimitConfigDO> getPage(RateLimitConfigReqVO reqVO);
RateLimitConfigDO getByChannelAndAppName(String channel, String appName);
boolean delete(List<Long> ids);
void check(List<MessageRecordDO> messageRecordList);
}

View File

@@ -1,13 +1,18 @@
package com.njcn.msgpush.module.push.service.ratelimit;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.controller.admin.ratelimit.VO.RateLimitConfigReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.ratelimit.RateLimitConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.ratelimit.RateLimitConfigMapper;
import com.njcn.msgpush.module.push.dal.redis.RateLimitRedisDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -18,6 +23,10 @@ import java.util.List;
*/
@Service
public class RateLimitConfigServiceImpl extends ServiceImpl<RateLimitConfigMapper, RateLimitConfigDO> implements RateLimitConfigService {
@Autowired
private RateLimitRedisDAO rateLimitRedisDAO;
@Override
public Page<RateLimitConfigDO> getPage(RateLimitConfigReqVO reqVO) {
QueryWrapper<RateLimitConfigDO> wrapper = new QueryWrapper<>();
@@ -25,6 +34,15 @@ public class RateLimitConfigServiceImpl extends ServiceImpl<RateLimitConfigMappe
return this.page(new Page<>(PageUtils.getPageNum(reqVO), PageUtils.getPageSize(reqVO)), wrapper);
}
@Override
public RateLimitConfigDO getByChannelAndAppName(String channel, String appName) {
return this.lambdaQuery().eq(RateLimitConfigDO::getChannel, channel)
.eq(RateLimitConfigDO::getAppName, appName)
.eq(RateLimitConfigDO::getEnabled, true)
.eq(RateLimitConfigDO::getDeleted, false).one();
}
@Override
public boolean delete(List<Long> ids) {
return this.lambdaUpdate()
@@ -32,4 +50,20 @@ public class RateLimitConfigServiceImpl extends ServiceImpl<RateLimitConfigMappe
.in(RateLimitConfigDO::getId, ids)
.update();
}
@Override
public void check(List<MessageRecordDO> messageRecordList) {
for (int i = 0; i < messageRecordList.size(); i++) {
MessageRecordDO messageRecordDO = messageRecordList.get(i);
RateLimitConfigDO rateLimitConfigDO = this.getByChannelAndAppName(messageRecordDO.getChannel(), messageRecordDO.getAppName());
if (ObjectUtil.isNotNull(rateLimitConfigDO)) {
Integer dailyQuota = rateLimitConfigDO.getDailyLimit();
Integer count = rateLimitRedisDAO.get(messageRecordDO.getChannel(), messageRecordDO.getAppName(), messageRecordDO.getReceiver());
if (count >= dailyQuota) {
messageRecordDO.setStatus(MsgStatusConstant.QUOTAEXCEEDED);
}
}
}
}
}