From 478f8a4c3daa434677699f7f0ad071ed3647b273 Mon Sep 17 00:00:00 2001 From: caozehui <2427765068@qq.com> Date: Tue, 10 Mar 2026 10:35:42 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=B5=E4=BF=A1=E8=8E=B7=E5=8F=96=E4=B8=8B?= =?UTF-8?q?=E8=A1=8C=E4=BF=A1=E6=81=AF=E5=AE=8C=E5=96=84=E3=80=81=E8=A1=A5?= =?UTF-8?q?=E5=85=85=E6=B6=88=E6=81=AF=E9=87=8D=E8=AF=95=E9=98=9F=E5=88=97?= =?UTF-8?q?=E3=80=81=E6=B6=88=E6=81=AF=E9=87=8D=E8=AF=95=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/push/PushServerApplication.java | 1 - .../module/push/client/sender/SmsSender.java | 7 ++ .../client/sender/impl/AliyunEmailSender.java | 43 +++++-- .../client/sender/impl/AliyunSmsSender.java | 29 +++-- .../client/sender/impl/TelecomSmsSender.java | 113 +++++++++++++----- .../module/push/constant/MsgPushConstant.java | 2 + .../admin/message/vo/MessageRecordReqVO.java | 7 ++ .../dataobject/retry/MessageRetryQueueDO.java | 2 +- .../mysql/retry/MessageRetryQueueMapper.java | 32 ----- .../retry/mapping/MessageRetryQueueMapper.xml | 27 +---- .../module/push/job/MessageRetryJob.java | 1 + .../channel/ChannelProviderConfigService.java | 2 + .../ChannelProviderConfigServiceImpl.java | 9 ++ .../service/message/MessageRecordService.java | 8 ++ .../message/MessageRecordServiceImpl.java | 26 +++- .../retry/MessageRetryQueueService.java | 23 ++-- .../retry/MessageRetryQueueServiceImpl.java | 89 +++++++------- .../module/push/sms/MsgPushClientTest.java | 26 ++++ 18 files changed, 285 insertions(+), 162 deletions(-) diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/PushServerApplication.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/PushServerApplication.java index 8d34da9..aea991a 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/PushServerApplication.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/PushServerApplication.java @@ -18,6 +18,5 @@ import org.springframework.scheduling.annotation.EnableScheduling; public class PushServerApplication { public static void main(String[] args) { SpringApplication.run(PushServerApplication.class, args); - } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/SmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/SmsSender.java index ca684fd..6fee231 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/SmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/SmsSender.java @@ -24,4 +24,11 @@ public interface SmsSender { * @return 发送结果 */ boolean sendBatchSms(List messageList); + + /** + * 查询模板信息 + * + * @param templateIdentifier 模板唯一标识符,例如:模板id、模板code + */ + void queryTemplate(String templateIdentifier); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java index c49d714..84b6cbb 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunEmailSender.java @@ -1,6 +1,8 @@ package com.njcn.msgpush.module.push.client.sender.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.aliyun.dm20151123.Client; import com.aliyun.dm20151123.models.SingleSendMailRequest; import com.aliyun.dm20151123.models.SingleSendMailResponse; @@ -9,13 +11,17 @@ import com.aliyun.teautil.models.RuntimeOptions; import com.njcn.msgpush.module.push.client.sender.EmailSender; import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting; +import com.njcn.msgpush.module.push.constant.MsgPushConstant; +import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Map; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; + +import static com.aliyun.teautil.Common.toJSONString; /** * @author caozehui @@ -28,6 +34,9 @@ public class AliyunEmailSender implements EmailSender { public static final String ACCOUNT_NAME = "accountName"; public static final String REPLY_TO_ADDRESS = "replyToAddress"; + + public static final String FROM_ALIAS = "fromAlias"; + public static final String SUBJECT = "subject"; public static final String HTML_BODY = "htmlBody"; public static final String TEXT_BODY = "textBody"; @@ -55,20 +64,40 @@ public class AliyunEmailSender implements EmailSender { public boolean sendEmail(MessageRecordDO message, Map params) { RuntimeOptions runtimeOptions = new RuntimeOptions(); runtimeOptions.autoretry = true; + + String extraInfo = message.getExtraInfo(); + JSONObject jsonObject = JSON.parseObject(extraInfo); SingleSendMailRequest request = new SingleSendMailRequest() - .setAccountName(params.get(ACCOUNT_NAME).toString()) + .setAccountName(jsonObject.get(ACCOUNT_NAME).toString()) .setAddressType(1) - .setReplyToAddress((boolean) params.get(REPLY_TO_ADDRESS)) + .setReplyToAddress((boolean) jsonObject.get(REPLY_TO_ADDRESS)) .setToAddress(message.getReceiver()) //目标地址,多个 email 地址可以用逗号分隔,最多 100 个地址(支持邮件组)。 - .setSubject(params.get(SUBJECT).toString()) - .setHtmlBody(params.get(HTML_BODY).toString()) //HtmlBody 和 TextBody 是针对不同类型的邮件 - .setTextBody(params.get(TEXT_BODY).toString()); + .setSubject(message.getTitle()) + .setHtmlBody(message.getContent()) //HtmlBody 和 TextBody 是针对不同类型的邮件 + .setTextBody("") + .setFromAlias(jsonObject.get(FROM_ALIAS).toString()); try { + LocalDateTime now = LocalDateTime.now(); + long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + message.setSendTime(now); SingleSendMailResponse response = this.emailClient.singleSendMailWithOptions(request, runtimeOptions); + LocalDateTime end = LocalDateTime.now(); + message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); + System.out.println(toJSONString(response)); if (HttpStatus.OK.value() == response.getStatusCode()) { + this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); return true; } else { + message.setErrorCode(response.getStatusCode() + ""); + ProviderErrorCodeMappingDO providerErrorCode = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + ""); + if (ObjectUtil.isNotNull(providerErrorCode)) { + message.setErrorMsg(providerErrorCode.getOriginalMessage()); + } else { + message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); + } + this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); + this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); return false; } } catch (Exception e) { diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java index fa739e0..71023c2 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/AliyunSmsSender.java @@ -12,9 +12,11 @@ import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.SmsSender; import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting; import com.njcn.msgpush.module.push.constant.MessageStatusConstant; +import com.njcn.msgpush.module.push.constant.MsgPushConstant; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; import java.time.LocalDateTime; import java.time.ZoneId; @@ -40,11 +42,6 @@ public class AliyunSmsSender implements SmsSender { private Client smsClient; - /** - * 存放发送完成的消息。key为其返回的mid - */ -// private Map completeSendMessageMap = new HashMap<>(); - private ScheduledExecutorService scheduledExecutorService; public AliyunSmsSender(AliYunMailSetting aliYunSmsSetting, Sender sender) { @@ -84,13 +81,14 @@ public class AliyunSmsSender implements SmsSender { LocalDateTime end = LocalDateTime.now(); message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); System.out.println(toJSONString(response)); - if (OK.equals(response.body.code)) { + if (HttpStatus.OK.value() == response.getStatusCode()) { this.getDownInfo(response.body.bizId, message); + this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); return true; } else { message.setErrorCode(response.body.code); message.setErrorMsg(response.body.message); - this.sender.messageRetryQueueService.addRetryMessage(message); + this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); return false; } @@ -110,14 +108,19 @@ public class AliyunSmsSender implements SmsSender { } @Override - public boolean sendBatchSms(List messageIdList) { + public boolean sendBatchSms(List messageList) { boolean res = true; - for (MessageRecordDO message : messageIdList) { + for (MessageRecordDO message : messageList) { res &= this.sendSms(message); } return res; } + @Override + public void queryTemplate(String templateIdentifier) { + + } + /** * 获取下行信息 * @@ -143,8 +146,12 @@ public class AliyunSmsSender implements SmsSender { if (!"DELIVERED".equals(detail.errCode)) { ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), detail.errCode); message.setErrorCode(detail.errCode); - message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - this.sender.messageRetryQueueService.addRetryMessage(message); + if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { + message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); + } else { + message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); + } + this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } }); // } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java index 99f38fe..dfe9639 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/client/sender/impl/TelecomSmsSender.java @@ -1,11 +1,14 @@ package com.njcn.msgpush.module.push.client.sender.impl; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.SmsSender; import com.njcn.msgpush.module.push.client.setting.impl.TelecomSmsSetting; import com.njcn.msgpush.module.push.constant.MessageStatusConstant; +import com.njcn.msgpush.module.push.constant.MsgPushConstant; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import lombok.Data; @@ -24,6 +27,8 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static com.alibaba.fastjson.JSON.toJSON; + /** * @author caozehui * @data 2026-02-11 @@ -85,12 +90,26 @@ public class TelecomSmsSender implements SmsSender { message.setStatus(MessageStatusConstant.SENDING); // 构建请求参数 Map request = new HashMap<>(); - request.put("action", "send"); + boolean isTemplateSend = StrUtil.isNotBlank(message.getTemplateCode()); request.put("account", telecomSmsSetting.getAccount()); request.put("password", telecomSmsSetting.getPassword()); - request.put("mobile", message.getReceiver()); - request.put("content", message.getContent()); request.put("extno", telecomSmsSetting.getExtno()); + if (isTemplateSend) { + request.put("action", "templatep2p"); + Map templateJsonMap = new HashMap<>(); + templateJsonMap.put("templateID", message.getTemplateCode()); + JSONObject jsonObject = JSON.parseObject(message.getTemplateParams()); + Map variable = new HashMap<>(); + variable.put("mobile", message.getReceiver()); + jsonObject.forEach((key, value) -> variable.put(key, value.toString())); + templateJsonMap.put("variable", "[" + toJSON(variable) + "]"); + request.put("templateJson", toJSON(templateJsonMap)); + } else { + request.put("action", "send"); + request.put("mobile", message.getReceiver()); + request.put("content", message.getContent()); + } + // 设置请求头 HttpHeaders headers = new HttpHeaders(); @@ -119,7 +138,7 @@ public class TelecomSmsSender implements SmsSender { ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + ""); message.setErrorCode(telecomSmsSendResponse.list.get(0).result + ""); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - this.sender.messageRetryQueueService.addRetryMessage(message); + this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); return false; } }); @@ -136,43 +155,71 @@ public class TelecomSmsSender implements SmsSender { @Override public boolean sendBatchSms(List messageList) { +// Map request = new HashMap<>(); +// request.put("action", "p2p"); +// request.put("account", telecomSmsSetting.getAccount()); +// request.put("password", telecomSmsSetting.getPassword()); +// Map mobileContentKvp = new HashMap<>(); +// for (MessageRecordDO message : messageList) { +// mobileContentKvp.put(message.getReceiver(), "【" + message.getTitle() + "】" + message.getContent()); +// message.setStatus(MessageStatusConstant.SENDING); +// } +// request.put("mobileContentKvp", JSON.toJSONString(mobileContentKvp)); +// request.put("extno", telecomSmsSetting.getExtno()); +// +// // 设置请求头 +// HttpHeaders headers = new HttpHeaders(); +// headers.set("Content-Type", CONTENT_TYPE); +// +// LocalDateTime now = LocalDateTime.now(); +// long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); +// +// // 发送请求 +// ResponseEntity response = this.sender.restTemplateUtil.post( +// telecomSmsSetting.getApiUrl(), +// request, +// headers, +// String.class +// ); +// LocalDateTime end = LocalDateTime.now(); +// for (MessageRecordDO message : messageList) { +// message.setSendTime(now); +// message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); +// } +// TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response.getBody(), TelecomSmsSendResponse.class); +// boolean res = true; +// for (TelecomSmsSendDetailRes telecomSmsSendDetailRes : telecomSmsSendResponse.list) { +// res &= telecomSmsSendDetailRes.result == 0; +// +// // 定时任务,指定时间间隔后获取下行信息 +// //this.getDownInfo(telecomSmsSendDetailRes.getMid(), message); +// } +// return res; + + boolean res = true; + for (MessageRecordDO message : messageList) { + res &= this.sendSms(message); + } + return res; + } + + @Override + public void queryTemplate(String templateIdentifier) { Map request = new HashMap<>(); - request.put("action", "p2p"); + request.put("action", "templateSelect"); request.put("account", telecomSmsSetting.getAccount()); request.put("password", telecomSmsSetting.getPassword()); - Map mobileContentKvp = new HashMap<>(); - for (MessageRecordDO message : messageList) { - mobileContentKvp.put(message.getReceiver(), "【" + message.getTitle() + "】" + message.getContent()); - message.setStatus(MessageStatusConstant.SENDING); - } - request.put("mobileContentKvp", JSON.toJSONString(mobileContentKvp)); - request.put("extno", telecomSmsSetting.getExtno()); + request.put("templateJson", StrUtil.isBlank(templateIdentifier) ? 0 : templateIdentifier); - // 设置请求头 HttpHeaders headers = new HttpHeaders(); headers.set("Content-Type", CONTENT_TYPE); - - LocalDateTime now = LocalDateTime.now(); - long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - - // 发送请求 ResponseEntity response = this.sender.restTemplateUtil.post( telecomSmsSetting.getApiUrl(), request, headers, String.class ); - LocalDateTime end = LocalDateTime.now(); - for (MessageRecordDO message : messageList) { - message.setSendTime(now); - message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); - } - TelecomSmsSendResponse telecomSmsSendResponse = JSON.parseObject(response.getBody(), TelecomSmsSendResponse.class); - boolean res = true; - for (TelecomSmsSendDetailRes telecomSmsSendDetailRes : telecomSmsSendResponse.list) { - res &= telecomSmsSendDetailRes.result == 0; - } - return res; + System.out.println(toJSON(response)); } /** @@ -215,8 +262,12 @@ public class TelecomSmsSender implements SmsSender { if (telecomSmsSelectDetailRes.getStatus() == 5) { ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat()); message.setErrorCode(telecomSmsSelectDetailRes.getStat()); - message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); - this.sender.messageRetryQueueService.addRetryMessage(message); + if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { + message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); + } else { + message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); + } + this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } // } this.scheduledExecutorService.shutdown(); diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MsgPushConstant.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MsgPushConstant.java index 607b320..a0e7715 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MsgPushConstant.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/constant/MsgPushConstant.java @@ -12,4 +12,6 @@ public class MsgPushConstant { public static final String CHANNEL_SMS = "sms"; public static final String CHANNEL_EMAIL = "email"; public static final String CHANNEL_APP_PUSH = "app_push"; + + public static final String ERROR_MSG_UNKNOWN = "未知错误"; } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java index 9009bd5..2d15b16 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/controller/admin/message/vo/MessageRecordReqVO.java @@ -1,8 +1,10 @@ package com.njcn.msgpush.module.push.controller.admin.message.vo; +import cn.hutool.core.lang.RegexPool; import com.njcn.msgpush.framework.common.pojo.PageParam; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.Pattern; import lombok.Data; /** @@ -29,10 +31,12 @@ public class MessageRecordReqVO extends PageParam { @Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "verify_code/order_notify/marketing/system_notify") @NotBlank(message = "消息类型不能为空") +// @InEnum(value = CommonStatusEnum.class,message = "消息类型错误") private String messageType; @Schema(description = "接收者", requiredMode = Schema.RequiredMode.REQUIRED) @NotBlank(message = "接收者不能为空") + @Pattern(regexp = RegexPool.EMAIL + "|" + RegexPool.MOBILE, message = "必须是有效的邮箱或手机号格式") private String receiver; @Schema(description = "标题", requiredMode = Schema.RequiredMode.REQUIRED) @@ -53,4 +57,7 @@ public class MessageRecordReqVO extends PageParam { @Schema(description = "第三方消息ID") private String thirdPartyId; + + @Schema(description = "额外信息") + private String extraInfo; } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java index 8108677..c7687b6 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/dataobject/retry/MessageRetryQueueDO.java @@ -13,7 +13,7 @@ import java.time.LocalDateTime; * @description 消息重试队列表对应数据对象 */ @Data -@TableName("message_retry_queue") +@TableName("push_message_retry_queue") @EqualsAndHashCode(callSuper = true) public class MessageRetryQueueDO extends BaseDO { /** diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java index cef034b..14edc0c 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/MessageRetryQueueMapper.java @@ -20,36 +20,4 @@ public interface MessageRetryQueueMapper extends BaseMapperX selectNeedRetryMessages(@Param("currentTime") LocalDateTime currentTime, @Param("limit") int limit); - - /** - * 更新重试信息 - * - * @param messageId 消息ID - * @param retryCount 重试次数 - * @param nextRetryTime 下次重试时间 - * @param lastRetryTime 最后重试时间 - * @param lastErrorMsg 最后错误信息 - * @return 影响行数 - */ - int updateRetryInfo(@Param("messageId") String messageId, - @Param("retryCount") Integer retryCount, - @Param("nextRetryTime") LocalDateTime nextRetryTime, - @Param("lastRetryTime") LocalDateTime lastRetryTime, - @Param("lastErrorMsg") String lastErrorMsg); - - /** - * 删除成功的重试记录 - * - * @param messageId 消息ID - * @return 影响行数 - */ - int deleteByMessageId(@Param("messageId") String messageId); - - /** - * 批量删除多个消息ID的重试记录 - * - * @param messageIds 消息ID列表 - * @return 影响行数 - */ - int deleteByMessageIds(@Param("messageIds") List messageIds); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml index 9282eaf..cfadc63 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/dal/mysql/retry/mapping/MessageRetryQueueMapper.xml @@ -4,34 +4,11 @@ - - UPDATE message_retry_queue - SET retry_count = #{retryCount}, - next_retry_time = #{nextRetryTime}, - last_retry_time = #{lastRetryTime}, - last_error_msg = #{lastErrorMsg}, - update_time = NOW() - WHERE message_id = #{messageId} and deleted != 0 - - - - DELETE - FROM message_retry_queue - WHERE message_id = #{messageId} - - - - DELETE FROM message_retry_queue - WHERE message_id IN - - #{messageId} - - diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java index 8a74a99..a7c3219 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/job/MessageRetryJob.java @@ -33,6 +33,7 @@ public class MessageRetryJob { */ @Scheduled(fixedRate = 3000) public void processEmailRetryQueue() { + log.info("开始处理邮件重试队列:{}", LocalDateTime.now()); messageRetryQueueService.processRetryBatch("email"); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigService.java index 6e7d3d0..79565f2 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/channel/ChannelProviderConfigService.java @@ -30,4 +30,6 @@ public interface ChannelProviderConfigService extends IService 5 ? 0 : 1); this.updateById(byTypeAndChannel); } + + @Override + public void successUpdate(String providerType, String channel) { + ChannelProviderConfigDO byTypeAndChannel = this.getByTypeAndChannel(providerType, channel); + byTypeAndChannel.setFailureCount(0); + byTypeAndChannel.setLastFailureTime(null); + byTypeAndChannel.setHealthStatus(1); + this.updateById(byTypeAndChannel); + } } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java index 000bd21..a278539 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordService.java @@ -53,4 +53,12 @@ public interface MessageRecordService { * @return */ boolean updateStatus(String messageId, String status); + + /** + * 更新消息记录重试次数 + * + * @param messageId + * @return + */ + boolean updateRetryCount(String messageId); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java index b948835..6936aa3 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/message/MessageRecordServiceImpl.java @@ -1,6 +1,7 @@ package com.njcn.msgpush.module.push.service.message; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -66,22 +67,28 @@ public class MessageRecordServiceImpl extends ServiceImpl messageProviderFactory.createSmsSender(channelProviderConfigDO, sender).sendSms(messageRecordDO); - case MsgPushConstant.CHANNEL_EMAIL -> - messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, new HashMap<>()); + case MsgPushConstant.CHANNEL_EMAIL -> { + Map params = new HashMap<>(); + yield messageProviderFactory.createEmailSender(channelProviderConfigDO, sender).sendEmail(messageRecordDO, params); + } case MsgPushConstant.CHANNEL_APP_PUSH -> messageProviderFactory.createAppPushSender(channelProviderConfigDO, sender).appPush(messageRecordDO); default -> throw new RuntimeException("暂不支持该渠道:" + messageRecordDO.getChannel()); }; - this.updateStatus(messageRecordDO.getMessageId(), sendResult ? MessageStatusConstant.SUCCESS : MessageStatusConstant.FAILED); - MessageRetryHistoryDO messageRetryHistoryDO = BeanUtil.copyProperties(messageRecordDO, MessageRetryHistoryDO.class, "id"); + if (sendResult) { + this.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.SUCCESS); + } else { + this.updateStatus(messageRecordDO.getMessageId(), MessageStatusConstant.FAILED); + } messageRetryHistoryDO.setRetrySequence(messageRetryHistoryService.getMaxRetrySequence(messageRecordDO.getMessageId())); messageRetryHistoryService.add(messageRetryHistoryDO); + return sendResult; } @@ -91,6 +98,13 @@ public class MessageRecordServiceImpl extends ServiceImpl listByIds(Collection ids) { - return this.baseMapper.selectByIds(ids); + return this.lambdaQuery().in(CollectionUtil.isNotEmpty(ids), MessageRecordDO::getMessageId, ids).list(); } @Override diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java index c28898e..0e2673e 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueService.java @@ -5,6 +5,7 @@ import com.njcn.msgpush.module.push.controller.admin.retry.vo.MessageRetryQueueR import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.retry.MessageRetryQueueDO; +import java.time.LocalDateTime; import java.util.List; public interface MessageRetryQueueService { @@ -13,7 +14,7 @@ public interface MessageRetryQueueService { * * @param message 消息 */ - void addRetryMessage(MessageRecordDO message); + void saveOrUpdateRetryMessage(MessageRecordDO message); /** * 批量处理重试消息 @@ -30,13 +31,6 @@ public interface MessageRetryQueueService { */ void manualRetry(List messageIds); - /** - * 移除重试记录 - * - * @param messageId 消息ID - * @return 是否成功 - */ - boolean removeRetryRecord(String messageId); /** * 分页查询重试队列 @@ -45,4 +39,17 @@ public interface MessageRetryQueueService { * @return 分页结果 */ PageResult getPage(MessageRetryQueueReqVO reqVO); + + /** + * 更新重试信息 + * + * @param messageId 消息ID + * @param retryCount 重试次数 + * @param nextRetryTime 下次重试时间 + * @param lastRetryTime 最后重试时间 + * @param lastErrorMsg 最后错误信息 + */ + boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String lastErrorMsg); + + boolean deleteByMessageIds(List messageIds); } diff --git a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java index 97c8538..1e624fa 100644 --- a/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java +++ b/msgpush-module-push/msgpush-module-push-server/src/main/java/com/njcn/msgpush/module/push/service/retry/MessageRetryQueueServiceImpl.java @@ -22,6 +22,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -40,6 +41,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl() @@ -86,20 +88,30 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl retryMessages = retryQueueMapper.selectNeedRetryMessages(LocalDateTime.now(), DEFAULT_BATCH_SIZE); // 从redis中查询需要重试的消息 - Set needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), DEFAULT_BATCH_SIZE); + long epochMilli = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + Set needRetryMessageIds = messageRetryRedisDAO.getNeedRetryMessageIds(channel, epochMilli, DEFAULT_BATCH_SIZE); // 没有需要重试的消息 if (needRetryMessageIds.isEmpty()) { @@ -175,6 +189,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl messageRecordDOList = messageRecordService.listByIds(needRetryMessageIds); + System.out.println("messageRecordDOList.size()=:" + messageRecordDOList.size()); for (MessageRecordDO messageRecordDO : messageRecordDOList) { processSingleRetry(messageRecordDO); @@ -204,27 +219,21 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl() - .eq(MessageRetryQueueDO::getMessageId, messageId) - ); - - if (retryRecord == null) { - log.warn("未找到重试记录: messageId={}", messageId); - return false; - } - - // 从数据库删除 - super.baseMapper.deleteByMessageId(messageId); - - // 从Redis删除 - messageRetryRedisDAO.removeFromRetryQueue(retryRecord.getChannel(), messageId); - - log.info("删除重试记录成功: messageId={}", messageId); - return true; + public boolean updateRetryInfo(String messageId, int retryCount, LocalDateTime nextRetryTime, LocalDateTime lastRetryTime, String s) { + return this.lambdaUpdate() + .set(MessageRetryQueueDO::getRetryCount, retryCount) + .set(MessageRetryQueueDO::getNextRetryTime, nextRetryTime) + .set(MessageRetryQueueDO::getLastRetryTime, lastRetryTime) + .eq(MessageRetryQueueDO::getMessageId, messageId) + .update(); + } + @Override + public boolean deleteByMessageIds(List messageIds) { + return this.lambdaUpdate() + .set(MessageRetryQueueDO::getDeleted, true) + .in(CollUtil.isNotEmpty(messageIds), MessageRetryQueueDO::getMessageId, messageIds) + .update(); } /** @@ -232,16 +241,15 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl sendFuture = CompletableFuture.supplyAsync(() -> - messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR - ); + CompletableFuture sendFuture = CompletableFuture.supplyAsync(() -> messageRecordService.processSendMsg(messageRecordDO), MSG_RETRY_THREAD_POOL_EXECUTOR); sendFuture.orTimeout(5, TimeUnit.SECONDS) .thenAccept(sendResult -> { + messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); if (sendResult) { log.info("处理消息重试成功逻辑:messageId={}", messageRecordDO.getMessageId()); - super.baseMapper.deleteByMessageId(messageRecordDO.getMessageId()); messageRetryRedisDAO.removeFromRetryQueue(messageRecordDO.getChannel(), messageRecordDO.getMessageId()); + this.deleteByMessageIds(Collections.singletonList(messageRecordDO.getMessageId())); } else { // 重试失败,更新重试信息 log.error("处理消息重试失败逻辑:messageId={}", messageRecordDO.getMessageId()); @@ -249,6 +257,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl { + messageRecordService.updateRetryCount(messageRecordDO.getMessageId()); log.error("异步执行消息重试发生异常:messageId={}", messageRecordDO.getMessageId(), ex); // 发生异常时也尝试处理失败逻辑,避免消息丢失 try { @@ -273,7 +282,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl= messageRetryQueueDO.getMaxRetry()) { @@ -290,7 +299,7 @@ public class MessageRetryQueueServiceImpl extends ServiceImpl messageProviderFactoryMap; + + @Autowired + private Sender sender; + @Test public void testSend() throws InterruptedException { List messageIdList = new ArrayList<>(); @@ -56,4 +69,17 @@ public class MsgPushClientTest { Thread.sleep(9000); System.out.println(sendResult); } + + @Test + public void templateSelect() { + MessageProviderFactory messageProviderFactory = messageProviderFactoryMap.get(MsgPushConstant.PROVIDER_TYPE_TELECOM); + ChannelProviderConfigDO config = new ChannelProviderConfigDO(); + config.setApiUrl("https://sms.ymeeting.cn/smsv2"); + config.setAppKey("925631"); + config.setAppSecret("AMW2pOVrdky"); + config.setExtno("106905631"); + SmsSender smsSender = messageProviderFactory.createSmsSender(config, sender); + String templateIdentifier = "SMS_481710295"; + smsSender.queryTemplate(templateIdentifier); + } }