修改消息的状态

This commit is contained in:
caozehui
2026-03-10 10:46:44 +08:00
parent 478f8a4c3d
commit 6a0bb5f23e
3 changed files with 56 additions and 33 deletions

View File

@@ -11,6 +11,7 @@ import com.aliyun.teautil.models.RuntimeOptions;
import com.njcn.msgpush.module.push.client.sender.EmailSender; import com.njcn.msgpush.module.push.client.sender.EmailSender;
import com.njcn.msgpush.module.push.client.sender.Sender; import com.njcn.msgpush.module.push.client.sender.Sender;
import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting; import com.njcn.msgpush.module.push.client.setting.impl.AliYunMailSetting;
import com.njcn.msgpush.module.push.constant.MessageStatusConstant;
import com.njcn.msgpush.module.push.constant.MsgPushConstant; import com.njcn.msgpush.module.push.constant.MsgPushConstant;
import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO; import com.njcn.msgpush.module.push.dal.dataobject.channel.ProviderErrorCodeMappingDO;
import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO; import com.njcn.msgpush.module.push.dal.dataobject.message.MessageRecordDO;
@@ -20,6 +21,8 @@ import org.springframework.http.HttpStatus;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static com.aliyun.teautil.Common.toJSONString; import static com.aliyun.teautil.Common.toJSONString;
@@ -62,46 +65,59 @@ public class AliyunEmailSender implements EmailSender {
@Override @Override
public boolean sendEmail(MessageRecordDO message, Map<String, Object> params) { public boolean sendEmail(MessageRecordDO message, Map<String, Object> params) {
RuntimeOptions runtimeOptions = new RuntimeOptions(); Future<Boolean> future = this.sender.MSG_PUSH_THREAD_POOL_EXECUTOR.submit(() -> {
runtimeOptions.autoretry = true; message.setStatus(MessageStatusConstant.SENDING);
RuntimeOptions runtimeOptions = new RuntimeOptions();
runtimeOptions.autoretry = true;
String extraInfo = message.getExtraInfo(); String extraInfo = message.getExtraInfo();
JSONObject jsonObject = JSON.parseObject(extraInfo); JSONObject jsonObject = JSON.parseObject(extraInfo);
SingleSendMailRequest request = new SingleSendMailRequest() SingleSendMailRequest request = new SingleSendMailRequest()
.setAccountName(jsonObject.get(ACCOUNT_NAME).toString()) .setAccountName(jsonObject.get(ACCOUNT_NAME).toString())
.setAddressType(1) .setAddressType(1)
.setReplyToAddress((boolean) jsonObject.get(REPLY_TO_ADDRESS)) .setReplyToAddress((boolean) jsonObject.get(REPLY_TO_ADDRESS))
.setToAddress(message.getReceiver()) //目标地址,多个 email 地址可以用逗号分隔,最多 100 个地址(支持邮件组)。 .setToAddress(message.getReceiver()) //目标地址,多个 email 地址可以用逗号分隔,最多 100 个地址(支持邮件组)。
.setSubject(message.getTitle()) .setSubject(message.getTitle())
.setHtmlBody(message.getContent()) //HtmlBody 和 TextBody 是针对不同类型的邮件 .setHtmlBody(message.getContent()) //HtmlBody 和 TextBody 是针对不同类型的邮件
.setTextBody("") .setTextBody("")
.setFromAlias(jsonObject.get(FROM_ALIAS).toString()); .setFromAlias(jsonObject.get(FROM_ALIAS).toString());
try { try {
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long start = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
message.setSendTime(now); message.setSendTime(now);
SingleSendMailResponse response = this.emailClient.singleSendMailWithOptions(request, runtimeOptions); SingleSendMailResponse response = this.emailClient.singleSendMailWithOptions(request, runtimeOptions);
LocalDateTime end = LocalDateTime.now(); LocalDateTime end = LocalDateTime.now();
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response)); System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) { if (HttpStatus.OK.value() == response.getStatusCode()) {
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); message.setStatus(MessageStatusConstant.SUCCESS);
return true; this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
} else { return true;
message.setErrorCode(response.getStatusCode() + "");
ProviderErrorCodeMappingDO providerErrorCode = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + "");
if (ObjectUtil.isNotNull(providerErrorCode)) {
message.setErrorMsg(providerErrorCode.getOriginalMessage());
} else { } else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN); message.setStatus(MessageStatusConstant.FAILED);
message.setErrorCode(response.getStatusCode() + "");
ProviderErrorCodeMappingDO providerErrorCode = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), response.getStatusCode() + "");
if (ObjectUtil.isNotNull(providerErrorCode)) {
message.setErrorMsg(providerErrorCode.getOriginalMessage());
} else {
message.setErrorMsg(MsgPushConstant.ERROR_MSG_UNKNOWN);
}
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel());
return false;
} }
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); } catch (Exception e) {
this.sender.channelProviderConfigService.failureUpdate(message.getProviderType(), message.getChannel()); throw new RuntimeException(e);
return false;
} }
});
Boolean b = null;
try {
b = future.get(3, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return b;
} }
} }

View File

@@ -82,10 +82,12 @@ public class AliyunSmsSender implements SmsSender {
message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start)); message.setCostTime((int) (end.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - start));
System.out.println(toJSONString(response)); System.out.println(toJSONString(response));
if (HttpStatus.OK.value() == response.getStatusCode()) { if (HttpStatus.OK.value() == response.getStatusCode()) {
message.setStatus(MessageStatusConstant.SUCCESS);
this.getDownInfo(response.body.bizId, message); this.getDownInfo(response.body.bizId, message);
this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel()); this.sender.channelProviderConfigService.successUpdate(message.getProviderType(), message.getChannel());
return true; return true;
} else { } else {
message.setStatus(MessageStatusConstant.FAILED);
message.setErrorCode(response.body.code); message.setErrorCode(response.body.code);
message.setErrorMsg(response.body.message); message.setErrorMsg(response.body.message);
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);

View File

@@ -136,6 +136,7 @@ public class TelecomSmsSender implements SmsSender {
return true; return true;
} else { } else {
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + ""); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSendResponse.list.get(0).result + "");
message.setStatus(MessageStatusConstant.FAILED);
message.setErrorCode(telecomSmsSendResponse.list.get(0).result + ""); message.setErrorCode(telecomSmsSendResponse.list.get(0).result + "");
message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage()); message.setErrorMsg(providerErrorCodeMappingDO.getOriginalMessage());
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
@@ -260,6 +261,7 @@ public class TelecomSmsSender implements SmsSender {
TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class); TelecomSmsSelectResponse telecomSmsSelectResponse = JSON.parseObject(response.getBody(), TelecomSmsSelectResponse.class);
TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0); TelecomSmsSelectDetailRes telecomSmsSelectDetailRes = telecomSmsSelectResponse.list.get(0);
if (telecomSmsSelectDetailRes.getStatus() == 5) { if (telecomSmsSelectDetailRes.getStatus() == 5) {
message.setStatus(MessageStatusConstant.FAILED);
ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat()); ProviderErrorCodeMappingDO providerErrorCodeMappingDO = this.sender.providerErrorCodeMappingService.getByProviderErrorCode(message.getProviderType(), message.getChannel(), telecomSmsSelectDetailRes.getStat());
message.setErrorCode(telecomSmsSelectDetailRes.getStat()); message.setErrorCode(telecomSmsSelectDetailRes.getStat());
if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) { if (ObjectUtil.isNotNull(providerErrorCodeMappingDO)) {
@@ -269,6 +271,9 @@ public class TelecomSmsSender implements SmsSender {
} }
this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message); this.sender.messageRetryQueueService.saveOrUpdateRetryMessage(message);
} }
if (telecomSmsSelectDetailRes.getStatus() == 4) {
message.setStatus(MessageStatusConstant.SUCCESS);
}
// } // }
this.scheduledExecutorService.shutdown(); this.scheduledExecutorService.shutdown();
this.scheduledExecutorService = null; this.scheduledExecutorService = null;