电信获取下行信息完善、补充消息重试队列、消息重试任务

This commit is contained in:
caozehui
2026-03-10 10:35:42 +08:00
parent 8806ba7afa
commit 478f8a4c3d
18 changed files with 285 additions and 162 deletions

View File

@@ -18,6 +18,5 @@ import org.springframework.scheduling.annotation.EnableScheduling;
public class PushServerApplication {
public static void main(String[] args) {
SpringApplication.run(PushServerApplication.class, args);
}
}

View File

@@ -24,4 +24,11 @@ public interface SmsSender {
* @return 发送结果
*/
boolean sendBatchSms(List<MessageRecordDO> messageList);
/**
* 查询模板信息
*
* @param templateIdentifier 模板唯一标识符例如模板id、模板code
*/
void queryTemplate(String templateIdentifier);
}

View File

@@ -1,6 +1,8 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dm20151123.Client;
import com.aliyun.dm20151123.models.SingleSendMailRequest;
import com.aliyun.dm20151123.models.SingleSendMailResponse;
@@ -9,13 +11,17 @@ import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.aliyun.teautil.Common.toJSONString;
/**
* @author caozehui
@@ -28,6 +34,9 @@ public class AliyunEmailSender implements EmailSender {
public static final String ACCOUNT_NAME = "accountName";
public static final String REPLY_TO_ADDRESS = "replyToAddress";
public static final String FROM_ALIAS = "fromAlias";
public static final String SUBJECT = "subject";
public static final String HTML_BODY = "htmlBody";
public static final String TEXT_BODY = "textBody";
@@ -55,20 +64,40 @@ public class AliyunEmailSender implements EmailSender {
public boolean sendEmail(MessageRecordDO message, Map<String, Object> params) {
RuntimeOptions runtimeOptions = new RuntimeOptions();
runtimeOptions.autoretry = true;
String extraInfo = message.getExtraInfo();
JSONObject jsonObject = JSON.parseObject(extraInfo);
SingleSendMailRequest request = new SingleSendMailRequest()
.setAccountName(params.get(ACCOUNT_NAME).toString())
.setAccountName(jsonObject.get(ACCOUNT_NAME).toString())
.setAddressType(1)
.setReplyToAddress((boolean) params.get(REPLY_TO_ADDRESS))
.setReplyToAddress((boolean) jsonObject.get(REPLY_TO_ADDRESS))
.setToAddress(message.getReceiver()) //目标地址,多个 email 地址可以用逗号分隔,最多 100 个地址(支持邮件组)。
.setSubject(params.get(SUBJECT).toString())
.setHtmlBody(params.get(HTML_BODY).toString()) //HtmlBody 和 TextBody 是针对不同类型的邮件
.setTextBody(params.get(TEXT_BODY).toString());
.setSubject(message.getTitle())
.setHtmlBody(message.getContent()) //HtmlBody 和 TextBody 是针对不同类型的邮件
.setTextBody("")
.setFromAlias(jsonObject.get(FROM_ALIAS).toString());
try {
LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now);
SingleSendMailResponse response = this.emailClient.singleSendMailWithOptions(request, runtimeOptions);
LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) {
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true;
} else {
message.setErrorCode(response.getStatusCode() + "");
ProviderErrorCodeMappingDO providerErrorCode = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + "");
if (ObjectUtil.isNotNull(providerErrorCode)) {
message.setErrorMsg(providerErrorCode.getOriginalMessage());
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
} catch (Exception e) {

View File

@@ -12,9 +12,11 @@ import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -40,11 +42,6 @@ public class AliyunSmsSender implements SmsSender {
private Client smsClient;
/**
* 存放发送完成的消息。key为其返回的mid
*/
// private Map<String, MessageRecordDO> completeSendMessageMap = new HashMap<>();
private ScheduledExecutorService scheduledExecutorService;
public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) {
@@ -84,13 +81,14 @@ public class AliyunSmsSender implements SmsSender {
LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response));
if (OK.equals(response.body.code)) {
if (HttpStatus.OK.value() == response.getStatusCode()) {
this.getDownInfo(response.body.bizId, message);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true;
} else {
message.setErrorCode(response.body.code);
message.setErrorMsg(response.body.message);
this.sender.messageRetryQueueService.addRetryMessage(message);
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
@@ -110,14 +108,19 @@ public class AliyunSmsSender implements SmsSender {
}
@Override
public boolean sendBatchSms(List<MessageRecordDO> messageIdList) {
public boolean sendBatchSms(List<MessageRecordDO> messageList) {
boolean res = true;
for (MessageRecordDO message : messageIdList) {
for (MessageRecordDO message : messageList) {
res &= this.sendSms(message);
}
return res;
}
@Override
public void queryTemplate(String templateIdentifier) {
}
/**
* 获取下行信息
*
@@ -143,8 +146,12 @@ public class AliyunSmsSender implements SmsSender {
if (!"DELIVERED".equals(detail.errCode)) {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode);
message.setErrorCode(detail.errCode);
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.addRetryMessage(message);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
});
// }

View File

@@ -1,11 +1,14 @@
package com.njcn.msgpush.module.push.client.sender.impl;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import lombok.Data;
@@ -24,6 +27,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.alibaba.fastjson.JSON.toJSON;
/**
* @author caozehui
* @data 2026-02-11
@@ -85,12 +90,26 @@ public class TelecomSmsSender implements SmsSender {
message.setStatus(MessageStatusConstant.SENDING);
// 构建请求参数
Map<String, Object> request = new HashMap<>();
request.put("action", "send");
boolean isTemplateSend = StrUtil.isNotBlank(message.getTemplateCode());
request.put("account", telecomSmsSetting.getAccount());
request.put("password", telecomSmsSetting.getPassword());
request.put("mobile", message.getReceiver());
request.put("content", message.getContent());
request.put("extno", telecomSmsSetting.getExtno());
if (isTemplateSend) {
request.put("action", "templatep2p");
Map<String, Object> templateJsonMap = new HashMap<>();
templateJsonMap.put("templateID", message.getTemplateCode());
JSONObject jsonObject = JSON.parseObject(message.getTemplateParams());
Map<String, String> variable = new HashMap<>();
variable.put("mobile", message.getReceiver());
jsonObject.forEach((key, value) -> variable.put(key, value.toString()));
templateJsonMap.put("variable", "[" + toJSON(variable) + "]");
request.put("templateJson", toJSON(templateJsonMap));
} else {
request.put("action", "send");
request.put("mobile", message.getReceiver());
request.put("content", message.getContent());
}
// 设置请求头
HttpHeaders headers = new HttpHeaders();
@@ -119,7 +138,7 @@ public class TelecomSmsSender implements SmsSender {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + "");
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.addRetryMessage(message);
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
return false;
}
});
@@ -136,43 +155,71 @@ public class TelecomSmsSender implements SmsSender {
@Override
public boolean sendBatchSms(List<MessageRecordDO> messageList) {
// Map<String, Object> request = new HashMap<>();
// request.put("action", "p2p");
// request.put("account", telecomSmsSetting.getAccount());
// request.put("password", telecomSmsSetting.getPassword());
// Map<String, String> mobileContentKvp = new HashMap<>();
// for (MessageRecordDO message : messageList) {
// mobileContentKvp.put(message.getReceiver(), "【" + message.getTitle() + "】" + message.getContent());
// message.setStatus(MessageStatusConstant.SENDING);
// }
// request.put("mobileContentKvp", JSON.toJSONString(mobileContentKvp));
// request.put("extno", telecomSmsSetting.getExtno());
//
// // 设置请求头
// HttpHeaders headers = new HttpHeaders();
// headers.set("Content-Type", CONTENT_TYPE);
//
// LocalDateTime now = LocalDateTime.now();
// long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
//
// // 发送请求
// ResponseEntity<String> response = this.sender.restTemplateUtil.post(
// telecomSmsSetting.getApiUrl(),
// request,
// headers,
// String.class
// );
// LocalDateTime end = LocalDateTime.now();
// for (MessageRecordDO message : messageList) {
// message.setSendTime(now);
// message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
// }
// TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response.getBody(), TelecomSmsSendResponse.class);
// boolean res = true;
// for (TelecomSmsSendDetailRes telecomSmsSendDetailRes : telecomSmsSendResponse.list) {
// res &= telecomSmsSendDetailRes.result == 0;
//
// // 定时任务,指定时间间隔后获取下行信息
// //this.getDownInfo(telecomSmsSendDetailRes.getMid(), message);
// }
// return res;
boolean res = true;
for (MessageRecordDO message : messageList) {
res &= this.sendSms(message);
}
return res;
}
@Override
public void queryTemplate(String templateIdentifier) {
Map<String, Object> request = new HashMap<>();
request.put("action", "p2p");
request.put("action", "templateSelect");
request.put("account", telecomSmsSetting.getAccount());
request.put("password", telecomSmsSetting.getPassword());
Map<String, String> mobileContentKvp = new HashMap<>();
for (MessageRecordDO message : messageList) {
mobileContentKvp.put(message.getReceiver(), "" + message.getTitle() + "" + message.getContent());
message.setStatus(MessageStatusConstant.SENDING);
}
request.put("mobileContentKvp", JSON.toJSONString(mobileContentKvp));
request.put("extno", telecomSmsSetting.getExtno());
request.put("templateJson", StrUtil.isBlank(templateIdentifier) ? 0 : templateIdentifier);
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", CONTENT_TYPE);
LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 发送请求
ResponseEntity<String> response = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
request,
headers,
String.class
);
LocalDateTime end = LocalDateTime.now();
for (MessageRecordDO message : messageList) {
message.setSendTime(now);
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
}
TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response.getBody(), TelecomSmsSendResponse.class);
boolean res = true;
for (TelecomSmsSendDetailRes telecomSmsSendDetailRes : telecomSmsSendResponse.list) {
res &= telecomSmsSendDetailRes.result == 0;
}
return res;
System.out.println(toJSON(response));
}
/**
@@ -215,8 +262,12 @@ public class TelecomSmsSender implements SmsSender {
if (telecomSmsSelectDetailRes.getStatus() == 5) {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
message.setErrorCode(telecomSmsSelectDetailRes.getStat());
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.addRetryMessage(message);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
// }
this.scheduledExecutorService.shutdown();

View File

@@ -12,4 +12,6 @@ public class MsgPushConstant {
public static final String CHANNEL_SMS = "sms";
public static final String CHANNEL_EMAIL = "email";
public static final String CHANNEL_APP_PUSH = "app_push";
public static final String ERROR_MSG_UNKNOWN = "未知错误";
}

View File

@@ -1,8 +1,10 @@
package com.njcn.msgpush.module.push.controller.admin.message.vo;
import cn.hutool.core.lang.RegexPool;
import com.njcn.msgpush.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import lombok.Data;
/**
@@ -29,10 +31,12 @@ public class MessageRecordReqVO extends PageParam {
@Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify")
@NotBlank(message = "消息类型不能为空")
// @InEnum(value = CommonStatusEnum.class,message = "消息类型错误")
private String messageType;
@Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED)
@NotBlank(message = "接收者不能为空")
@Pattern(regexp = RegexPool.EMAIL + "|" + RegexPool.MOBILE, message = "必须是有效的邮箱或手机号格式")
private String receiver;
@Schema(description = "标题", requiredMode = Schema.RequiredMode.REQUIRED)
@@ -53,4 +57,7 @@ public class MessageRecordReqVO extends PageParam {
@Schema(description = "第三方消息ID")
private String thirdPartyId;
@Schema(description = "额外信息")
private String extraInfo;
}

View File

@@ -13,7 +13,7 @@ import java.time.LocalDateTime;
* @description 消息重试队列表对应数据对象
*/
@Data
@TableName("message_retry_queue")
@TableName("push_message_retry_queue")
@EqualsAndHashCode(callSuper = true)
public class MessageRetryQueueDO extends BaseDO {
/**

View File

@@ -20,36 +20,4 @@ public interface MessageRetryQueueMapper extends BaseMapperX<MessageRetryQueueDO
*/
List<MessageRetryQueueDO> selectNeedRetryMessages(@Param("currentTime") LocalDateTime currentTime,
@Param("limit") int limit);
/**
* 更新重试信息
*
* @param messageId 消息ID
* @param retryCount 重试次数
* @param nextRetryTime 下次重试时间
* @param lastRetryTime 最后重试时间
* @param lastErrorMsg 最后错误信息
* @return 影响行数
*/
int updateRetryInfo(@Param("messageId") String messageId,
@Param("retryCount") Integer retryCount,
@Param("nextRetryTime") LocalDateTime nextRetryTime,
@Param("lastRetryTime") LocalDateTime lastRetryTime,
@Param("lastErrorMsg") String lastErrorMsg);
/**
* 删除成功的重试记录
*
* @param messageId 消息ID
* @return 影响行数
*/
int deleteByMessageId(@Param("messageId") String messageId);
/**
* 批量删除多个消息ID的重试记录
*
* @param messageIds 消息ID列表
* @return 影响行数
*/
int deleteByMessageIds(@Param("messageIds") List<String> messageIds);
}

View File

@@ -4,34 +4,11 @@
<select id="selectNeedRetryMessages" resultType="com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO">
SELECT *
FROM message_retry_queue
WHERE next_retry_time <![CDATA[ <= ]]> #{currentTime} and deleted != 0
FROM push_message_retry_queue
WHERE next_retry_time <![CDATA[ <= ]]> #{currentTime} AND deleted = 0
ORDER BY next_retry_time ASC
LIMIT #{limit}
</select>
<update id="updateRetryInfo">
UPDATE message_retry_queue
SET retry_count = #{retryCount},
next_retry_time = #{nextRetryTime},
last_retry_time = #{lastRetryTime},
last_error_msg = #{lastErrorMsg},
update_time = NOW()
WHERE message_id = #{messageId} and deleted != 0
</update>
<delete id="deleteByMessageId">
DELETE
FROM message_retry_queue
WHERE message_id = #{messageId}
</delete>
<delete id="deleteByMessageIds">
DELETE FROM message_retry_queue
WHERE message_id IN
<foreach collection="messageIds" item="messageId" open="(" separator="," close=")">
#{messageId}
</foreach>
</delete>
</mapper>

View File

@@ -33,6 +33,7 @@ public class MessageRetryJob {
*/
@Scheduled(fixedRate = 3000)
public void processEmailRetryQueue() {
log.info("开始处理邮件重试队列:{}", LocalDateTime.now());
messageRetryQueueService.processRetryBatch("email");
}

View File

@@ -30,4 +30,6 @@ public interface ChannelProviderConfigService extends IService<ChannelProviderCo
ChannelProviderConfigDO getByTypeAndChannel(String providerType, String channel);
void failureUpdate(String providerType, String channel);
void successUpdate(String providerType, String channel);
}

View File

@@ -51,4 +51,13 @@ public class ChannelProviderConfigServiceImpl extends ServiceImpl<ChannelProvide
byTypeAndChannel.setHealthStatus(byTypeAndChannel.getFailureCount() > 5 ? 0 : 1);
this.updateById(byTypeAndChannel);
}
@Override
public void successUpdate(String providerType, String channel) {
ChannelProviderConfigDO byTypeAndChannel = this.getByTypeAndChannel(providerType, channel);
byTypeAndChannel.setFailureCount(0);
byTypeAndChannel.setLastFailureTime(null);
byTypeAndChannel.setHealthStatus(1);
this.updateById(byTypeAndChannel);
}
}

View File

@@ -53,4 +53,12 @@ public interface MessageRecordService {
* @return
*/
boolean updateStatus(String messageId, String status);
/**
* 更新消息记录重试次数
*
* @param messageId
* @return
*/
boolean updateRetryCount(String messageId);
}

View File

@@ -1,6 +1,7 @@
package com.njcn.msgpush.module.push.service.message;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -66,22 +67,28 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
public boolean processSendMsg(MessageRecordDO messageRecordDO) {
ChannelProviderConfigDO channelProviderConfigDO = channelProviderConfigService.getByTypeAndChannel(messageRecordDO.getProviderType(), messageRecordDO.getChannel());
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(messageRecordDO.getProviderType());
if (ObjectUtil.isNull(messageProviderFactory)) {
if (ObjectUtil.isNull(messageProviderFactory) || ObjectUtil.isNull(channelProviderConfigDO)) {
throw new RuntimeException("暂不支持该供应商或者该供应商未激活:" + messageRecordDO.getProviderType());
}
boolean sendResult = switch (messageRecordDO.getChannel()) {
case MsgPushConstant.CHANNEL_SMS -> messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO);
case MsgPushConstant.CHANNEL_EMAIL ->
messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, new HashMap<>());
case MsgPushConstant.CHANNEL_EMAIL -> {
Map<String, Object> params = new HashMap<>();
yield messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, params);
}
case MsgPushConstant.CHANNEL_APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO);
default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel());
};
this.updateStatus(messageRecordDO.getMessageId(), sendResult ? MessageStatusConstant.SUCCESS : MessageStatusConstant.FAILED);
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
if (sendResult) {
this.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.SUCCESS);
} else {
this.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.FAILED);
}
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
return sendResult;
}
@@ -91,6 +98,13 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
return this.lambdaUpdate().eq(MessageRecordDO::getMessageId, messageId).set(MessageRecordDO::getStatus, status).update();
}
@Override
public boolean updateRetryCount(String messageId) {
MessageRecordDO one = this.lambdaQuery().eq(MessageRecordDO::getMessageId, messageId).one();
one.setRetryCount(one.getRetryCount() + 1);
return this.updateById(one);
}
@Override
public MessageRecordDO getById(String messageId) {
return this.lambdaQuery().eq(MessageRecordDO::getMessageId, messageId).eq(MessageRecordDO::getDeleted, true).one();
@@ -104,7 +118,7 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
@Override
public List<MessageRecordDO> listByIds(Collection<? extends Serializable> ids) {
return this.baseMapper.selectByIds(ids);
return this.lambdaQuery().in(CollectionUtil.isNotEmpty(ids), MessageRecordDO::getMessageId, ids).list();
}
@Override

View File

@@ -5,6 +5,7 @@ import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueR
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO;
import java.time.LocalDateTime;
import java.util.List;
public interface MessageRetryQueueService {
@@ -13,7 +14,7 @@ public interface MessageRetryQueueService {
*
* @param message 消息
*/
void addRetryMessage(MessageRecordDO message);
void saveOrUpdateRetryMessage(MessageRecordDO message);
/**
* 批量处理重试消息
@@ -30,13 +31,6 @@ public interface MessageRetryQueueService {
*/
void manualRetry(List<String> messageIds);
/**
* 移除重试记录
*
* @param messageId 消息ID
* @return 是否成功
*/
boolean removeRetryRecord(String messageId);
/**
* 分页查询重试队列
@@ -45,4 +39,17 @@ public interface MessageRetryQueueService {
* @return 分页结果
*/
PageResult<MessageRetryQueueDO> getPage(MessageRetryQueueReqVO reqVO);
/**
* 更新重试信息
*
* @param messageId 消息ID
* @param retryCount 重试次数
* @param nextRetryTime 下次重试时间
* @param lastRetryTime 最后重试时间
* @param lastErrorMsg 最后错误信息
*/
boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String lastErrorMsg);
boolean deleteByMessageIds(List<String> messageIds);
}

View File

@@ -22,6 +22,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -40,6 +41,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
@Autowired
public RetryStrategyConfigService retryStrategyConfigService;
public final ThreadPoolExecutor MSG_RETRY_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,
5,
@@ -68,11 +70,11 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
/**
* 默认最大重试次数
*/
private static final int DEFAULT_MAX_RETRY_COUNT = 3;
private static final int DEFAULT_MAX_RETRY_COUNT = 5;
@Override
@Transactional(rollbackFor = Exception.class)
public void addRetryMessage(MessageRecordDO message) {
public void saveOrUpdateRetryMessage(MessageRecordDO message) {
// 检查是否已存在重试记录
MessageRetryQueueDO existing = super.baseMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
@@ -86,20 +88,30 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
retryRecord.setMessageId(message.getMessageId());
retryRecord.setChannel(message.getChannel());
retryRecord.setReceiver(message.getReceiver());
retryRecord.setRetryCount(0);
retryRecord.setMaxRetry(message.getRetryCount());
retryRecord.setFirstFailTime(LocalDateTime.now());
retryRecord.setRetryCount(1);
RetryStrategyConfigDO strategyConfig = retryStrategyConfigService.getStrategyConfig(message.getChannel());
if (ObjectUtil.isNull(strategyConfig)) {
retryRecord.setMaxRetry(DEFAULT_MAX_RETRY_COUNT);
} else {
retryRecord.setMaxRetry(strategyConfig.getMaxRetryCount());
}
retryRecord.setFirstFailTime(message.getSendTime());
retryRecord.setLastErrorMsg(message.getErrorMsg());
// 计算下次重试时间
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), 1);
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), retryRecord.getRetryCount());
retryRecord.setNextRetryTime(nextRetryTime);
super.baseMapper.insert(retryRecord);
// 同步到Redis
messageRetryRedisDAO.addToRetryQueue(message);
message.setNextRetryTime(nextRetryTime);
this.save(retryRecord);
} else {
existing.setRetryCount(existing.getRetryCount() + 1);
// 计算下次重试时间
LocalDateTime nextRetryTime = this.calculateNextRetryTime(message.getChannel(), existing.getRetryCount());
existing.setNextRetryTime(nextRetryTime);
message.setNextRetryTime(nextRetryTime);
this.updateById(existing);
}
// 同步到Redis
messageRetryRedisDAO.addToRetryQueue(message);
}
/**
@@ -127,7 +139,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
break;
case MsgPushConstant.CHANNEL_EMAIL: {
if (retryCount == 1) {
plusSeconds = 60 * 10;
// plusSeconds = 60 * 10;
plusSeconds = 60 * 2;
} else if (retryCount == 2) {
plusSeconds = 60 * 30;
} else if (retryCount == 3) {
@@ -167,7 +180,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
// 从数据库查询需要重试的消息
// List<MessageRetryQueueDO> retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE);
// 从redis中查询需要重试的消息
Set<String> needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), DEFAULT_BATCH_SIZE);
long epochMilli = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Set<String> needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, epochMilli, DEFAULT_BATCH_SIZE);
// 没有需要重试的消息
if (needRetryMessageIds.isEmpty()) {
@@ -175,6 +189,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
}
List<MessageRecordDO> messageRecordDOList = messageRecordService.listByIds(needRetryMessageIds);
System.out.println("messageRecordDOList.size()=:" + messageRecordDOList.size());
for (MessageRecordDO messageRecordDO : messageRecordDOList) {
processSingleRetry(messageRecordDO);
@@ -204,27 +219,21 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean removeRetryRecord(String messageId) {
MessageRetryQueueDO retryRecord = super.baseMapper.selectOne(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<MessageRetryQueueDO>()
.eq(MessageRetryQueueDO::getMessageId, messageId)
);
if (retryRecord == null) {
log.warn("未找到重试记录: messageId={}", messageId);
return false;
}
// 从数据库删除
super.baseMapper.deleteByMessageId(messageId);
// 从Redis删除
messageRetryRedisDAO.removeFromRetryQueue(retryRecord.getChannel(), messageId);
log.info("删除重试记录成功: messageId={}", messageId);
return true;
public boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String s) {
return this.lambdaUpdate()
.set(MessageRetryQueueDO::getRetryCount, retryCount)
.set(MessageRetryQueueDO::getNextRetryTime, nextRetryTime)
.set(MessageRetryQueueDO::getLastRetryTime, lastRetryTime)
.eq(MessageRetryQueueDO::getMessageId, messageId)
.update();
}
@Override
public boolean deleteByMessageIds(List<String> messageIds) {
return this.lambdaUpdate()
.set(MessageRetryQueueDO::getDeleted, true)
.in(CollUtil.isNotEmpty(messageIds), MessageRetryQueueDO::getMessageId, messageIds)
.update();
}
/**
@@ -232,16 +241,15 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
*/
private void processSingleRetry(MessageRecordDO messageRecordDO) {
// 异步调用消息发送接口进行重试
CompletableFuture<Boolean> sendFuture = CompletableFuture.supplyAsync(() ->
messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR
);
CompletableFuture<Boolean> sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR);
sendFuture.orTimeout(5, TimeUnit.SECONDS)
.thenAccept(sendResult -> {
messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
if (sendResult) {
log.info("处理消息重试成功逻辑messageId={}", messageRecordDO.getMessageId());
super.baseMapper.deleteByMessageId(messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
} else {
// 重试失败,更新重试信息
log.error("处理消息重试失败逻辑messageId={}", messageRecordDO.getMessageId());
@@ -249,6 +257,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
handleRetryFailure(messageRecordDO);
}
}).exceptionally(ex -> {
messageRecordService.updateRetryCount(messageRecordDO.getMessageId());
log.error("异步执行消息重试发生异常messageId={}", messageRecordDO.getMessageId(), ex);
// 发生异常时也尝试处理失败逻辑,避免消息丢失
try {
@@ -273,7 +282,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
* 处理重试失败的情况
*/
private void handleRetryFailure(MessageRecordDO messageRecordDO) {
MessageRetryQueueDO messageRetryQueueDO = this.getById(messageRecordDO.getMessageId());
MessageRetryQueueDO messageRetryQueueDO = this.lambdaQuery().eq(MessageRetryQueueDO::getMessageId, messageRecordDO.getMessageId()).one();
int newRetryCount = messageRetryQueueDO.getRetryCount() + 1;
if (newRetryCount >= messageRetryQueueDO.getMaxRetry()) {
@@ -290,7 +299,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
// 还可以继续重试,更新重试信息
LocalDateTime nextRetryTime = this.calculateNextRetryTime(messageRecordDO.getChannel(), newRetryCount);
super.baseMapper.updateRetryInfo(
this.updateRetryInfo(
messageRecordDO.getMessageId(),
newRetryCount,
nextRetryTime,

View File

@@ -1,17 +1,23 @@
package com.njcn.msgpush.module.push.sms;
import com.njcn.msgpush.module.push.client.factory.MessageProviderFactory;
import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.sender.SmsSender;
import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.service.message.MessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
@@ -26,6 +32,13 @@ public class MsgPushClientTest {
@Autowired
private MessageRecordService messageRecordService;
@Autowired
@Qualifier("messageProviderFactoryMap")
private Map<String, MessageProviderFactory> messageProviderFactoryMap;
@Autowired
private Sender sender;
@Test
public void testSend() throws InterruptedException {
List<MessageRecordReqVO> messageIdList = new ArrayList<>();
@@ -56,4 +69,17 @@ public class MsgPushClientTest {
Thread.sleep(9000);
System.out.println(sendResult);
}
@Test
public void templateSelect() {
MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(MsgPushConstant.PROVIDER_TYPE_TELECOM);
ChannelProviderConfigDO config = new ChannelProviderConfigDO();
config.setApiUrl("https://sms.ymeeting.cn/smsv2");
config.setAppKey("925631");
config.setAppSecret("AMW2pOVrdky");
config.setExtno("106905631");
SmsSender smsSender = messageProviderFactory.createSmsSender(config, sender);
String templateIdentifier = "SMS_481710295";
smsSender.queryTemplate(templateIdentifier);
}
}