系统配额、redis缓存各个系统&渠道发送消息的数量

This commit is contained in:
caozehui
2026-03-23 15:39:10 +08:00
parent 6934e19030
commit be7c1b7b26
22 changed files with 359 additions and 165 deletions

View File

@@ -1,6 +1,7 @@
package com.njcn.msgpush.module.push.checker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.List;
@@ -11,5 +12,5 @@ import java.util.List;
*/
public interface IChecker {
void check(List<MessageRecordReqVO> messageRecordReqVOList);
void check(List<MessageRecordDO> messageRecordList);
}

View File

@@ -3,9 +3,10 @@ package com.njcn.msgpush.module.push.checker;
import com.njcn.msgpush.module.push.checker.impl.BlacklistChecker;
import com.njcn.msgpush.module.push.checker.impl.QuotaChecker;
import com.njcn.msgpush.module.push.checker.impl.RateLimitChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
@@ -13,19 +14,18 @@ import java.util.List;
* @data 2026-02-28
* @description 检查链
*/
@Component
public class MsgPushGuardChain {
private final List<IChecker> checkers;
@Autowired
private BlacklistChecker blacklistChecker;
@Autowired
private QuotaChecker quotaChecker;
@Autowired
private RateLimitChecker rateLimitChecker;
public MsgPushGuardChain() {
this.checkers = new ArrayList<>();
this.checkers.add(new BlacklistChecker());
this.checkers.add(new QuotaChecker());
this.checkers.add(new RateLimitChecker());
}
public void checkAll(List<MessageRecordReqVO> messageRecordReqVOList) {
for (IChecker checker : checkers) {
checker.check(messageRecordReqVOList);
}
public void checkAll(List<MessageRecordDO> messageRecordList) {
blacklistChecker.check(messageRecordList);
quotaChecker.check(messageRecordList);
}
}

View File

@@ -1,7 +1,8 @@
package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.service.blacklist.BlacklistService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -19,7 +20,13 @@ public class BlacklistChecker implements IChecker {
private BlacklistService blacklistService;
@Override
public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
blacklistService.check(messageRecordReqVOList);
public void check(List<MessageRecordDO> messageRecordList) {
for (int i = 0; i < messageRecordList.size(); i++) {
MessageRecordDO messageRecordDO = messageRecordList.get(i);
boolean check = blacklistService.check(messageRecordDO);
if (!check) {
messageRecordDO.setStatus(MsgStatusConstant.BLACKLISTED);
}
}
}
}

View File

@@ -2,6 +2,10 @@ package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.service.quota.SystemQuotaConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@@ -10,9 +14,14 @@ import java.util.List;
* @data 2026-02-27
* @description 系统配额检查器
*/
@Component
public class QuotaChecker implements IChecker {
@Override
public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
@Autowired
private SystemQuotaConfigService systemQuotaConfigService;
@Override
public void check(List<MessageRecordDO> messageRecordList) {
systemQuotaConfigService.check(messageRecordList);
}
}

View File

@@ -3,6 +3,8 @@ package com.njcn.msgpush.module.push.checker.impl;
import com.njcn.msgpush.module.push.checker.IChecker;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import org.springframework.stereotype.Component;
import java.util.List;
@@ -11,9 +13,10 @@ import java.util.List;
* @data 2026-02-27
* @description 接收者频率检查器
*/
@Component
public class RateLimitChecker implements IChecker {
@Override
public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
public void check(List<MessageRecordDO> messageRecordList) {
}
}

View File

@@ -11,8 +11,8 @@ import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.extern.slf4j.Slf4j;
@@ -68,7 +68,7 @@ public class AliyunEmailSender implements EmailSender {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MessageStatusConstant.SENDING);
message.setStatus(MsgStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions();
runtimeOptions.autoretry = true;
@@ -93,13 +93,13 @@ public class AliyunEmailSender implements EmailSender {
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
message.setStatus(MsgStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true;
} else {
message.setStatus(MessageStatusConstant.FAILED);
message.setStatus(MsgStatusConstant.FAILED);
message.setErrorCode(response.getStatusCode() + "");
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + "");
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
@@ -110,8 +110,8 @@ public class AliyunEmailSender implements EmailSender {
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}

View File

@@ -11,8 +11,8 @@ import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.extern.slf4j.Slf4j;
@@ -62,7 +62,7 @@ public class AliyunSmsSender implements SmsSender {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MessageStatusConstant.SENDING);
message.setStatus(MsgStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions();
// 设置自动重试默认是不开启的。重试次数默认是3次
runtimeOptions.autoretry = true;
@@ -82,12 +82,12 @@ public class AliyunSmsSender implements SmsSender {
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setStatus(MessageStatusConstant.SUCCESS);
message.setStatus(MsgStatusConstant.SUCCESS);
this.getDownInfo(response.body.bizId, message);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true;
} else {
message.setStatus(MessageStatusConstant.FAILED);
message.setStatus(MsgStatusConstant.FAILED);
message.setErrorCode(response.body.code);
message.setErrorMsg(response.body.message);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.body.code);
@@ -151,7 +151,7 @@ public class AliyunSmsSender implements SmsSender {
// if (response.statusCode != HttpStatus.OK.value()) {
response.body.smsSendDetailDTOs.smsSendDetailDTO.forEach(detail -> {
if (!"DELIVERED".equals(detail.errCode)) {
message.setStatus(MessageStatusConstant.FAILED);
message.setStatus(MsgStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode);
message.setErrorCode(detail.errCode);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
@@ -162,13 +162,13 @@ public class AliyunSmsSender implements SmsSender {
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
} else {
message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null);
message.setStatus(MsgStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
}
});

View File

@@ -7,8 +7,8 @@ import com.alibaba.fastjson.JSONObject;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.Data;
@@ -85,7 +85,7 @@ public class TelecomSmsSender implements SmsSender {
Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
message.setLastRetryTime(message.getSendTime());
message.setNextRetryTime(null);
message.setStatus(MessageStatusConstant.SENDING);
message.setStatus(MsgStatusConstant.SENDING);
// 构建请求参数
Map<String, Object> request = new HashMap<>();
boolean isTemplateSend = StrUtil.isNotBlank(message.getTemplateCode());
@@ -137,7 +137,7 @@ public class TelecomSmsSender implements SmsSender {
return true;
} else {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + "");
message.setStatus(MessageStatusConstant.FAILED);
message.setStatus(MsgStatusConstant.FAILED);
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
@@ -264,7 +264,7 @@ public class TelecomSmsSender implements SmsSender {
TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class);
TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0);
if (telecomSmsSelectDetailRes.getStatus() == 5) {
message.setStatus(MessageStatusConstant.FAILED);
message.setStatus(MsgStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
message.setErrorCode(telecomSmsSelectDetailRes.getStat());
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
@@ -274,16 +274,15 @@ public class TelecomSmsSender implements SmsSender {
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
//this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg(), message.getLastRetryTime(), message.getNextRetryTime());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
}
if (telecomSmsSelectDetailRes.getStatus() == 4) {
message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
message.setStatus(MsgStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null, message.getLastRetryTime(), null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MsgStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
}

View File

@@ -4,7 +4,7 @@ package com.njcn.msgpush.module.push.constant;
* @author caozehui
* @data 2026-02-26
*/
public class MessageStatusConstant {
public class MsgStatusConstant {
//待发送
public static final String PENDING = "pending";
//发送中

View File

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.framework.common.pojo.CommonResult;
import com.njcn.msgpush.framework.idempotent.core.annotation.Idempotent;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import io.swagger.v3.oas.annotations.Operation;
@@ -33,7 +34,7 @@ public class MessageRecordController {
@PostMapping("/send")
@Operation(summary = "消息推送")
@Idempotent(timeout = 60)
public CommonResult<Boolean> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
public CommonResult<List<MessageSendResultVO>> send(@Valid @RequestBody List<MessageRecordReqVO> reqVOList) {
return success(messageRecordService.send(reqVOList));
}

View File

@@ -0,0 +1,21 @@
package com.njcn.msgpush.module.push.controller.admin.message.vo;
import lombok.Data;
/**
* @author caozehui
* @data 2026-03-23
*/
@Data
public class MessageSendResultVO {
private String messageId;
/**
* 发送结果
*/
private Boolean result;
/**
* 详情。成功为success失败为黑名单拦截、配额超限、调用第三方服务发送消息失败
*/
private String detail;
}

View File

@@ -0,0 +1,40 @@
package com.njcn.msgpush.module.push.dal.dataobject.quota;
import com.baomidou.mybatisplus.annotation.TableName;
import com.njcn.msgpush.framework.mybatis.core.dataobject.BaseDO;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author caozehui
* @data 2026-03-23
*/
@Data
@TableName("push_system_quota_config")
@EqualsAndHashCode(callSuper = true)
public class SystemQuotaConfigDO extends BaseDO {
/**
* 主键 ID
*/
private Long id;
/**
* 渠道类型sms/email/app_push
*/
private String channel;
/**
* 来源系统
*/
private String appName;
/**
* 系统每日总配额
*/
private Integer dailyQuota;
/**
* 是否启用0-否 1-是
*/
private Boolean enabled;
}

View File

@@ -0,0 +1,14 @@
package com.njcn.msgpush.module.push.dal.mysql.quota;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.quota.SystemQuotaConfigDO;
import org.apache.ibatis.annotations.Mapper;
/**
* @author caozehui
* @data 2026-03-23
*/
@Mapper
public interface SystemQuotaConfigMapper extends BaseMapperX<SystemQuotaConfigDO> {
}

View File

@@ -0,0 +1,44 @@
package com.njcn.msgpush.module.push.dal.redis;
import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class SystemQuotaRedisDAO {
private final RedisTemplate<String, String> redisTemplate;
private static final String QUOTA_KEY_PREFIX = "msgpush:quota:";
public void set(String channel, String appName, boolean isSchedule) {
String key = buildKey(channel, appName);
if (isSchedule) {
redisTemplate.opsForValue().set(key, "0");
} else {
String countStr = redisTemplate.opsForValue().get(key);
Integer count = 0;
if (ObjectUtil.isNull(countStr)) {
count = 1;
} else {
count = Integer.parseInt(countStr) + 1;
}
redisTemplate.opsForValue().set(key, String.valueOf(count));
}
}
public Integer get(String channel, String appName) {
String key = buildKey(channel, appName);
String countStr = redisTemplate.opsForValue().get(key);
return ObjectUtil.isNull(countStr) ? 0 : Integer.parseInt(countStr);
}
private String buildKey(String channel, String appName) {
return QUOTA_KEY_PREFIX + channel + ":" + appName;
}
}

View File

@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.util.List;
@@ -20,7 +21,7 @@ public interface BlacklistService extends IService<BlacklistDO> {
/**
* 进行黑名单检查
*
* @param messageRecordReqVOList
* @param messageRecordDO
*/
void check(List<MessageRecordReqVO> messageRecordReqVOList);
boolean check(MessageRecordDO messageRecordDO);
}

View File

@@ -9,6 +9,7 @@ import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.controller.admin.blacklist.vo.BlacklistReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.blacklist.BlacklistDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.mysql.blacklist.BlacklistMapper;
import org.springframework.stereotype.Service;
@@ -34,24 +35,19 @@ public class BlacklistServiceImpl extends ServiceImpl<BlacklistMapper, Blacklist
}
@Override
public void check(List<MessageRecordReqVO> messageRecordReqVOList) {
List<BlacklistDO> exists = new ArrayList<>();
for (int i = messageRecordReqVOList.size() - 1; i >= 0; i--) {
MessageRecordReqVO messageRecordReqVO = messageRecordReqVOList.get(i);
String receiver = messageRecordReqVO.getReceiver();
String[] split = receiver.split(String.valueOf(StrUtil.C_COMMA));
BlacklistDO blacklistDO = this.lambdaQuery()
.eq(BlacklistDO::getChannel, messageRecordReqVO.getChannel())
.in(split.length > 0, BlacklistDO::getTarget, split)
.eq(BlacklistDO::getDeleted, false).one();
public boolean check(MessageRecordDO messageRecordDO) {
String receiver = messageRecordDO.getReceiver();
String[] split = receiver.split(String.valueOf(StrUtil.C_COMMA));
BlacklistDO blacklistDO = this.lambdaQuery()
.eq(BlacklistDO::getChannel, messageRecordDO.getChannel())
.in(split.length > 0, BlacklistDO::getTarget, split)
.eq(BlacklistDO::getDeleted, false).one();
if (ObjectUtil.isNotNull(blacklistDO)) {
messageRecordReqVOList.remove(i);
blacklistDO.setHitCount(blacklistDO.getHitCount() + 1);
exists.add(blacklistDO);
}
if (ObjectUtil.isNotNull(blacklistDO)) {
blacklistDO.setHitCount(blacklistDO.getHitCount() + 1);
this.updateById(blacklistDO);
return false;
}
// 更新
this.updateBatchById(exists);
return true;
}
}

View File

@@ -2,6 +2,7 @@ package com.njcn.msgpush.module.push.service.message;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import java.io.Serializable;
@@ -15,17 +16,17 @@ public interface MessageRecordService {
* 发送消息包括email、sms、app_push
*
* @param reqVOList
* @return 发送是否成功的结果
* @return 发送的结果
*/
boolean send(List<MessageRecordReqVO> reqVOList);
List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList);
/**
* 处理发送消息
*
* @param messageRecordDO
* @return 发送是否成功的结果
* @param messageRecordDOList
* @return 发送的结果
*/
boolean processSendMsg(MessageRecordDO messageRecordDO);
List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList);
/**
* 添加消息记录

View File

@@ -8,14 +8,17 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.util.object.PageUtils;
import com.njcn.msgpush.module.push.checker.MsgPushGuardChain;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryHistoryDO;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService;
@@ -25,11 +28,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
@Service
public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, MessageRecordDO> implements MessageRecordService {
@@ -47,52 +46,77 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
@Qualifier("messageProviderFactoryMap")
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
@Autowired
private MsgPushGuardChain msgPushGuardChain;
@Autowired
private SystemQuotaRedisDAO systemQuotaRedisDAO;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean send(List<MessageRecordReqVO> reqVOList) {
public List<MessageSendResultVO> send(List<MessageRecordReqVO> reqVOList) {
List<MessageRecordDO> messageRecordDOList = BeanUtil.copyToList(reqVOList, MessageRecordDO.class);
messageRecordDOList.forEach(messageRecordDO -> {
messageRecordDO.setStatus(MessageStatusConstant.PENDING);
messageRecordDO.setStatus(MsgStatusConstant.PENDING);
});
this.saveBatch(messageRecordDOList);
boolean sendResult = true;
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
sendResult &= this.processSendMsg(messageRecordDO);
}
return sendResult;
return this.processSendMsg(messageRecordDOList);
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean processSendMsg(MessageRecordDO messageRecordDO) {
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
if (ObjectUtil.isNull(messageProviderFactory) || ObjectUtil.isNull(channelProviderConfigDO)) {
throw new RuntimeException("暂不支持该供应商或者该供应商未激活:" + messageRecordDO.getProviderType());
}
boolean sendResult = switch (ChannelTypeEnum.getByCode(messageRecordDO.getChannel())) {
case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
case EMAIL -> {
Map<String, Object> params = new HashMap<>();
yield messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, params);
public List<MessageSendResultVO> processSendMsg(List<MessageRecordDO> messageRecordDOList) {
msgPushGuardChain.checkAll(messageRecordDOList);
List<MessageSendResultVO> resultList = new ArrayList<>();
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
MessageSendResultVO messageSendResultVO = new MessageSendResultVO();
messageSendResultVO.setMessageId(messageRecordDO.getMessageId());
if (!MsgStatusConstant.PENDING.equals(messageRecordDO.getStatus())) {
messageSendResultVO.setResult(false);
if (messageRecordDO.getStatus().equals(MsgStatusConstant.BLACKLISTED)) {
messageSendResultVO.setDetail("黑名单拦截");
}
if (messageRecordDO.getStatus().equals(MsgStatusConstant.QUOTAEXCEEDED)) {
messageSendResultVO.setDetail("配额超限");
}
resultList.add(messageSendResultVO);
continue;
}
case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel());
};
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
if (sendResult) {
this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MessageStatusConstant.SUCCESS);
} else {
this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MessageStatusConstant.FAILED);
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
if (ObjectUtil.isNull(messageProviderFactory) || ObjectUtil.isNull(channelProviderConfigDO)) {
throw new RuntimeException("暂不支持该供应商或者该供应商未激活:" + messageRecordDO.getProviderType());
}
boolean sendResult = switch (ChannelTypeEnum.getByCode(messageRecordDO.getChannel())) {
case SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
case EMAIL -> {
Map<String, Object> params = new HashMap<>();
yield messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, params);
}
case APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel());
};
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
if (sendResult) {
this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MsgStatusConstant.SUCCESS);
messageSendResultVO.setResult(true);
} else {
this.updateMessage(messageRecordDO);
messageRetryHistoryDO.setStatus(MsgStatusConstant.FAILED);
messageSendResultVO.setResult(false);
messageSendResultVO.setDetail("调用第三方服务发送消息失败");
}
resultList.add(messageSendResultVO);
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
// 更新配额
systemQuotaRedisDAO.set(messageRecordDO.getChannel(), messageRecordDO.getAppName(),false);
}
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
return sendResult;
return resultList;
}

View File

@@ -0,0 +1,18 @@
package com.njcn.msgpush.module.push.service.quota;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.quota.SystemQuotaConfigDO;
import java.util.List;
/**
* @author caozehui
* @data 2026-03-23
*/
public interface SystemQuotaConfigService extends IService<SystemQuotaConfigDO> {
SystemQuotaConfigDO getByChannelAndAppName(String channel, String appName);
void check(List<MessageRecordDO> messageRecordList);
}

View File

@@ -0,0 +1,57 @@
package com.njcn.msgpush.module.push.service.quota;
import cn.hutool.core.util.ObjectUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.quota.SystemQuotaConfigDO;
import com.njcn.msgpush.module.push.dal.mysql.quota.SystemQuotaConfigMapper;
import com.njcn.msgpush.module.push.dal.redis.SystemQuotaRedisDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author caozehui
* @data 2026-03-23
*/
@Service
public class SystemQuotaConfigServiceImpl extends ServiceImpl<SystemQuotaConfigMapper, SystemQuotaConfigDO> implements SystemQuotaConfigService {
@Autowired
private SystemQuotaRedisDAO systemQuotaRedisDAO;
@Override
public SystemQuotaConfigDO getByChannelAndAppName(String channel, String appName) {
return this.lambdaQuery()
.eq(SystemQuotaConfigDO::getChannel, channel)
.eq(SystemQuotaConfigDO::getAppName, appName)
.eq(SystemQuotaConfigDO::getDeleted, false)
.one();
}
@Override
public void check(List<MessageRecordDO> messageRecordList) {
for (int i = 0; i < messageRecordList.size(); i++) {
MessageRecordDO messageRecordDO = messageRecordList.get(i);
SystemQuotaConfigDO systemQuotaConfigDO = this.getByChannelAndAppName(messageRecordDO.getChannel(), messageRecordDO.getAppName());
if (ObjectUtil.isNotNull(systemQuotaConfigDO)) {
Integer dailyQuota = systemQuotaConfigDO.getDailyQuota();
Integer count = systemQuotaRedisDAO.get(messageRecordDO.getChannel(), messageRecordDO.getAppName());
if (count >= dailyQuota) {
messageRecordDO.setStatus(MsgStatusConstant.QUOTAEXCEEDED);
}
}
}
}
@Scheduled(cron = "0 0 0 * * ?")
public void resetDailyQuota() {
List<SystemQuotaConfigDO> list = this.lambdaQuery().eq(SystemQuotaConfigDO::getEnabled, true).eq(SystemQuotaConfigDO::getDeleted, false).list();
for (SystemQuotaConfigDO systemQuotaConfigDO : list) {
systemQuotaRedisDAO.set(systemQuotaConfigDO.getChannel(), systemQuotaConfigDO.getAppName(), true);
}
}
}

View File

@@ -5,7 +5,8 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.msgpush.framework.common.pojo.PageResult;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgStatusConstant;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
@@ -21,7 +22,6 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -177,7 +177,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
// 从数据库查询需要重试的消息
// List<MessageRetryQueueDO> retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE);
// 从redis中查询需要重试的消息
long epochMilli = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
long epochMilli = System.currentTimeMillis();
Set<String> needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, epochMilli, DEFAULT_BATCH_SIZE);
// 没有需要重试的消息
@@ -245,36 +245,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
* 处理单个消息的重试逻辑
*/
private void processSingleRetry(MessageRecordDO messageRecordDO) {
// 异步调用消息发送接口进行重试
//CompletableFuture<Boolean> sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR);
// sendFuture.orTimeout(5, TimeUnit.SECONDS)
// .thenAccept(sendResult -> {
// messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
// if (sendResult) {
// log.info("处理消息重试成功逻辑messageId={}", messageRecordDO.getMessageId());
// messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
// this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
//
// } else {
// // 重试失败,更新重试信息
// log.error("处理消息重试失败逻辑messageId={}", messageRecordDO.getMessageId());
// channelProviderConfigService.failureUpdate(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
// handleRetryFailure(messageRecordDO);
// }
// }).exceptionally(ex -> {
// messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
// log.error("异步执行消息重试发生异常messageId={}", messageRecordDO.getMessageId(), ex);
// // 发生异常时也尝试处理失败逻辑,避免消息丢失
// try {
// handleRetryFailure(messageRecordDO);
// } catch (Exception e) {
// log.error("异常处理后再次失败messageId={}", messageRecordDO.getMessageId(), e);
// }
// return null;
// });
boolean sendResult = messageRecordService.processSendMsg(messageRecordDO);
MessageSendResultVO messageSendResultVO = messageRecordService.processSendMsg(Collections.singletonList(messageRecordDO)).get(0);
boolean sendResult = messageSendResultVO.getResult();
messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
if (sendResult) {
@@ -291,15 +263,6 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
messageRecordService.updateMessage(messageRecordDO);
}
/**
* 模拟重试过程(实际应用中应替换为真实的消息发送逻辑)
*/
private boolean simulateRetryProcess(String messageId) {
// 这里应该是调用实际的消息发送服务
// 暂时随机返回成功或失败用于演示
return Math.random() > 0.3; // 70%成功率
}
/**
* 处理重试失败的情况
*/
@@ -308,22 +271,15 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
int newRetryCount = messageRetryQueueDO.getRetryCount() + 1;
if (newRetryCount >= messageRetryQueueDO.getMaxRetry()) {
messageRecordDO.setStatus(MessageStatusConstant.FINALFAILED);
messageRecordDO.setStatus(MsgStatusConstant.FINALFAILED);
messageRecordDO.setNextRetryTime(null);
// 达到最大重试次数,标记为最终失败
// 更新消息的状态为final_failed
//messageRecordService.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg(), messageRecordDO.getSendTime(), null);
// 数据库中不能删除
// retryQueueMapper.deleteByMessageId(messageRecordDO.getMessageId());
// redis中可以删除
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
} else {
// 还可以继续重试,更新重试信息
LocalDateTime nextRetryTime = this.calculateNextRetryTime(messageRecordDO.getChannel(), newRetryCount);
messageRecordDO.setStatus(MessageStatusConstant.FAILED);
messageRecordDO.setStatus(MsgStatusConstant.FAILED);
messageRecordDO.setNextRetryTime(nextRetryTime);
this.updateRetryInfo(

View File

@@ -1,9 +1,11 @@
package com.njcn.msgpush.module.push.sms;
import com.alibaba.fastjson.JSON;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageSendResultVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.enums.ChannelTypeEnum;
import com.njcn.msgpush.module.push.enums.ProviderTypeEnum;
@@ -65,10 +67,10 @@ public class MsgPushClientTest {
message.setProviderType(ProviderTypeEnum.TELECOM.getCode());
messageIdList.add(message);
}
boolean sendResult = messageRecordService.send(messageIdList);
List<MessageSendResultVO> sendResult = messageRecordService.send(messageIdList);
Thread.sleep(9000);
System.out.println(sendResult);
System.out.println(JSON.toJSONString(sendResult));
}
@Test