完善消息错误信息、编写消息重试队列

This commit is contained in:
caozehui
2026-03-12 09:13:43 +08:00
parent 6a0bb5f23e
commit e281226069
16 changed files with 173 additions and 23 deletions

View File

@@ -146,6 +146,22 @@
</executions>
</plugin>
</plugins>
<resources>
<!-- 标准资源文件 -->
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
</resource>
<!-- Java 目录下的 XML 文件MyBatis Mapper -->
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>

View File

@@ -1,7 +1,9 @@
package com.njcn.msgpush.module.push.client.sender;
import com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper;
import com.njcn.msgpush.module.push.service.channel.ChannelProviderConfigService;
import com.njcn.msgpush.module.push.service.channel.ProviderErrorCodeMappingService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryHistoryService;
import com.njcn.msgpush.module.push.service.retry.MessageRetryQueueService;
import com.njcn.msgpush.module.push.util.RestTemplateUtil;
import org.springframework.beans.factory.annotation.Autowired;
@@ -25,6 +27,10 @@ public class Sender {
public ProviderErrorCodeMappingService providerErrorCodeMappingService;
@Autowired
public RestTemplateUtil restTemplateUtil;
@Autowired
public MessageRetryHistoryService messageRetryHistoryService;
@Autowired
public MessageRecordMapper messageRecordMapper;
public final ThreadPoolExecutor MSG_PUSH_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
5,

View File

@@ -92,18 +92,24 @@ public class AliyunEmailSender implements EmailSender {
System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true;
} else {
message.setStatus(MessageStatusConstant.FAILED);
message.setErrorCode(response.getStatusCode() + "");
ProviderErrorCodeMappingDO providerErrorCode = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + "");
if (ObjectUtil.isNotNull(providerErrorCode)) {
message.setErrorMsg(providerErrorCode.getOriginalMessage());
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + "");
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if (providerErrorCodeMappingDO.getShouldRetry() == 1) {
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}

View File

@@ -77,6 +77,8 @@ public class AliyunSmsSender implements SmsSender {
LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now);
// 更新上次重试时间
this.sender.messageRetryQueueService.updateLastRetryTime(message.getMessageId(), now);
SendSmsResponse response = this.smsClient.sendSmsWithOptions(request, runtimeOptions);
LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
@@ -90,7 +92,15 @@ public class AliyunSmsSender implements SmsSender {
message.setStatus(MessageStatusConstant.FAILED);
message.setErrorCode(response.body.code);
message.setErrorMsg(response.body.message);
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.body.code);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
@@ -150,10 +160,20 @@ public class AliyunSmsSender implements SmsSender {
message.setErrorCode(detail.errCode);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
} else {
message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
}
});
// }

View File

@@ -118,6 +118,9 @@ public class TelecomSmsSender implements SmsSender {
LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now);
// 更新上次重试时间
this.sender.messageRetryQueueService.updateLastRetryTime(message.getMessageId(), now);
// this.setMessageIdToRedis(message);
// 发送请求
ResponseEntity<String> response1 = this.sender.restTemplateUtil.post(
telecomSmsSetting.getApiUrl(),
@@ -138,8 +141,15 @@ public class TelecomSmsSender implements SmsSender {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + "");
message.setStatus(MessageStatusConstant.FAILED);
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
}
});
@@ -150,7 +160,6 @@ public class TelecomSmsSender implements SmsSender {
} catch (Exception e) {
throw new RuntimeException(e);
}
return b;
}
@@ -260,23 +269,68 @@ public class TelecomSmsSender implements SmsSender {
// if (response.getStatusCode() == HttpStatus.OK) {
TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class);
TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0);
// if (StrUtil.isBlank(message.getErrorMsg())) {
// message.setStatus(MessageStatusConstant.FAILED);
// ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), "MA:0006");
// message.setErrorCode(telecomSmsSelectDetailRes.getStat());
// if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
// message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
// } else {
// message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
// }
// this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
// this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
// } else {
// message.setStatus(MessageStatusConstant.SUCCESS);
// }
if (telecomSmsSelectDetailRes.getStatus() == 5) {
message.setStatus(MessageStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
message.setErrorCode(telecomSmsSelectDetailRes.getStat());
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
if(providerErrorCodeMappingDO.getShouldRetry() == 1){
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
}
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.FAILED, message.getErrorCode(), message.getErrorMsg());
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
}
if (telecomSmsSelectDetailRes.getStatus() == 4) {
message.setStatus(MessageStatusConstant.SUCCESS);
this.sender.messageRecordMapper.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
this.sender.messageRetryHistoryService.updateStatusAndErrorInfo(message.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
}
// }
this.scheduledExecutorService.shutdown();
this.scheduledExecutorService = null;
}, 10, TimeUnit.SECONDS);
}
/**
* 从redis中判断是否可以发送消息
*
* @param message
* @return 将消息ID缓存到redis中
* @param message
* <p>
* 将消息ID缓存到redis中
* @param message
*/
// private boolean canSend(MessageRecordDO message) {
// String cachedData = this.sender.redisTemplate.opsForValue().get(message.getMessageId());
// return ObjectUtil.isNull(cachedData);
// }
/**
* 将消息ID缓存到redis中
*
* @param message
*/
// private void setMessageIdToRedis(MessageRecordDO message) {
// this.sender.redisTemplate.opsForValue().set(message.getMessageId(), message.getMessageId(), 60, TimeUnit.SECONDS);
// }
}

View File

@@ -11,7 +11,7 @@ import lombok.EqualsAndHashCode;
* @description 服务商错误码映射表
*/
@Data
@TableName("push_channel_provider_config")
@TableName("push_provider_error_code_mapping")
@EqualsAndHashCode(callSuper = true)
public class ProviderErrorCodeMappingDO extends BaseDO {
/**

View File

@@ -2,10 +2,10 @@ package com.njcn.msgpush.module.push.dal.mysql.message;
import com.njcn.msgpush.framework.mybatis.core.mapper.BaseMapperX;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import jakarta.validation.constraints.Pattern;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface MessageRecordMapper extends BaseMapperX<MessageRecordDO> {
int updateStatusAndErrorInfo(@Param("messageId") String messageId, @Param("status") String status, @Param("errorCode") String errorCode, @Param("errorMsg") String errorMsg);
}

View File

@@ -3,5 +3,11 @@
<mapper namespace="com.njcn.msgpush.module.push.dal.mysql.message.MessageRecordMapper">
<update id="updateStatusAndErrorInfo">
UPDATE push_message_record
SET status = #{status},
error_code = #{errorCode},
error_msg = #{errorMsg}
WHERE message_id = #{messageId}
</update>
</mapper>

View File

@@ -25,6 +25,7 @@ public class MessageRetryJob {
*/
@Scheduled(fixedRate = 3000)
public void processSmsRetryQueue() {
log.info("开始处理短信重试队列:{}", LocalDateTime.now());
messageRetryQueueService.processRetryBatch("sms");
}
@@ -33,7 +34,6 @@ public class MessageRetryJob {
*/
@Scheduled(fixedRate = 3000)
public void processEmailRetryQueue() {
log.info("开始处理邮件重试队列:{}", LocalDateTime.now());
messageRetryQueueService.processRetryBatch("email");
}

View File

@@ -2,9 +2,7 @@ package com.njcn.msgpush.module.push.service.message;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.njcn.msgpush.module.push.controller.admin.message.vo.MessageRecordReqVO;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ChannelProviderConfigDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
import org.apache.ibatis.annotations.Param;
import java.io.Serializable;
import java.util.Collection;
@@ -22,6 +20,7 @@ public interface MessageRecordService {
/**
* 处理发送消息
*
* @param messageRecordDO
* @return 发送是否成功的结果
*/
@@ -50,9 +49,11 @@ public interface MessageRecordService {
*
* @param messageId
* @param status
* @param errorCode
* @param errorMsg
* @return
*/
boolean updateStatus(String messageId, String status);
boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg);
/**
* 更新消息记录重试次数

View File

@@ -82,9 +82,11 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id");
if (sendResult) {
this.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.SUCCESS);
this.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.SUCCESS, null, null);
messageRetryHistoryDO.setStatus(MessageStatusConstant.SUCCESS);
} else {
this.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.FAILED);
this.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg());
messageRetryHistoryDO.setStatus(MessageStatusConstant.FAILED);
}
messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId()));
messageRetryHistoryService.add(messageRetryHistoryDO);
@@ -94,8 +96,8 @@ public class MessageRecordServiceImpl extends ServiceImpl<MessageRecordMapper, M
@Override
@Transactional(rollbackFor = Exception.class)
public boolean updateStatus(String messageId, String status) {
return this.lambdaUpdate().eq(MessageRecordDO::getMessageId, messageId).set(MessageRecordDO::getStatus, status).update();
public boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg) {
return this.baseMapper.updateStatusAndErrorInfo(messageId, status, errorCode, errorMsg) > 0;
}
@Override

View File

@@ -16,4 +16,15 @@ public interface MessageRetryHistoryService {
* @return
*/
Integer getMaxRetrySequence(String messageId);
/**
* 更新消息错误信息
*
* @param messageId 消息ID
* @param status 状态
* @param errorCode 错误码
* @param errorMsg 错误信息
* @return
*/
boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg);
}

View File

@@ -24,4 +24,19 @@ public class MessageRetryHistoryServiceImpl extends ServiceImpl<MessageRetryHist
.one();
return ObjectUtil.isNull(one) ? 0 : one.getRetrySequence() + 1;
}
@Override
public boolean updateStatusAndErrorInfo(String messageId, String status, String errorCode, String errorMsg) {
MessageRetryHistoryDO one = this.lambdaQuery()
.eq(MessageRetryHistoryDO::getMessageId, messageId)
.orderByDesc(MessageRetryHistoryDO::getRetrySequence)
.last("limit 1").one();
if (ObjectUtil.isNotNull(one)) {
one.setStatus(status);
one.setErrorCode(errorCode);
one.setErrorMsg(errorMsg);
return this.updateById(one);
}
return false;
}
}

View File

@@ -52,4 +52,6 @@ public interface MessageRetryQueueService {
boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String lastErrorMsg);
boolean deleteByMessageIds(List<String> messageIds);
boolean updateLastRetryTime(String messageId, LocalDateTime now);
}

View File

@@ -236,6 +236,14 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
.update();
}
@Override
public boolean updateLastRetryTime(String messageId, LocalDateTime now) {
return this.lambdaUpdate()
.eq(MessageRetryQueueDO::getMessageId, messageId)
.set(MessageRetryQueueDO::getLastRetryTime, now)
.update();
}
/**
* 处理单个消息的重试逻辑
*/
@@ -250,6 +258,8 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
log.info("处理消息重试成功逻辑messageId={}", messageRecordDO.getMessageId());
messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId());
this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId()));
// todo 重试成功后是否要删除retry_history表中的数据
} else {
// 重试失败,更新重试信息
log.error("处理消息重试失败逻辑messageId={}", messageRecordDO.getMessageId());
@@ -289,7 +299,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl<MessageRetryQueueM
// 达到最大重试次数,标记为最终失败
// 更新消息的状态为final_failed
messageRecordService.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED);
messageRecordService.updateStatusAndErrorInfo(messageRecordDO.getMessageId(), MessageStatusConstant.FINALFAILED, messageRecordDO.getErrorCode(), messageRecordDO.getErrorMsg());
// 数据库中不能删除
// retryQueueMapper.deleteByMessageId(messageRecordDO.getMessageId());

View File

@@ -56,6 +56,7 @@ knife4j:
# MyBatis Plus 的配置项
mybatis-plus:
mapper-locations: classpath*:com/njcn/msgpush/**/mapping/*.xml
configuration:
map-underscore-to-camel-case: true # 虽然默认为 true ,但是还是显示去指定下。
global-config: