修改部分魔法值
This commit is contained in:
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
|
||||
import com.njcn.message.constant.MessageStatus;
|
||||
import com.njcn.message.messagedto.DevComFlagDTO;
|
||||
import com.njcn.message.messagedto.MessageDataDTO;
|
||||
import com.njcn.message.redis.RedisKeyPrefix;
|
||||
import com.njcn.message.websocket.WebSocketServer;
|
||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||
@@ -67,7 +68,7 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler<Dev
|
||||
public boolean filter(DevComFlagDTO message) {
|
||||
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
|
||||
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@@ -77,7 +78,7 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler<Dev
|
||||
*/
|
||||
@Override
|
||||
protected void consumeSuccess(DevComFlagDTO message) {
|
||||
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
}
|
||||
|
||||
|
||||
@@ -95,7 +96,7 @@ public class DeviceRunFlagDataConsumer extends EnhanceConsumerMessageHandler<Dev
|
||||
*/
|
||||
@Override
|
||||
protected void saveExceptionMsgLog(DevComFlagDTO message, String identity, Exception exception) {
|
||||
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
|
||||
|
||||
import com.njcn.message.constant.MessageStatus;
|
||||
import com.njcn.message.messagedto.MessageDataDTO;
|
||||
import com.njcn.message.redis.RedisKeyPrefix;
|
||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
@@ -53,7 +54,7 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
|
||||
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||
|
||||
|
||||
private List<MessageDataDTO> messageList = new ArrayList<>(120);
|
||||
private List<MessageDataDTO> messageList = new ArrayList<>(1);
|
||||
|
||||
@Override
|
||||
public void onMessage(String baseMessage) {
|
||||
@@ -73,7 +74,7 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
|
||||
public boolean filter(MessageDataDTO message) {
|
||||
String keyStatus = redisUtil.getStringByKey(message.getKey());
|
||||
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.BEING_PROCESSED, 60L);
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HARMMONIC_TOPIC.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@@ -83,7 +84,7 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
|
||||
*/
|
||||
@Override
|
||||
protected void consumeSuccess(MessageDataDTO message) {
|
||||
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HARMMONIC_TOPIC.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
}
|
||||
|
||||
|
||||
@@ -91,7 +92,7 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
|
||||
protected void handleMessage(MessageDataDTO message) {
|
||||
synchronized (messageList) {
|
||||
messageList.add(message);
|
||||
if (messageList.size() >= 120) {
|
||||
if (messageList.size() >= 1) {
|
||||
saveToDatabase();
|
||||
}
|
||||
}
|
||||
@@ -106,7 +107,7 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
|
||||
*/
|
||||
@Override
|
||||
protected void saveExceptionMsgLog(MessageDataDTO message, String identity, Exception exception) {
|
||||
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.HARMMONIC_TOPIC.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
|
||||
|
||||
import com.njcn.message.constant.MessageStatus;
|
||||
import com.njcn.message.messagedto.MessageDataDTO;
|
||||
import com.njcn.message.redis.RedisKeyPrefix;
|
||||
import com.njcn.message.websocket.WebSocketServer;
|
||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||
@@ -61,9 +62,10 @@ public class RealTimeDataConsumer extends EnhanceConsumerMessageHandler<MessageD
|
||||
*/
|
||||
@Override
|
||||
public boolean filter(MessageDataDTO message) {
|
||||
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
|
||||
String keyStatus = redisUtil.getStringByKey(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()));
|
||||
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@@ -73,7 +75,7 @@ public class RealTimeDataConsumer extends EnhanceConsumerMessageHandler<MessageD
|
||||
*/
|
||||
@Override
|
||||
protected void consumeSuccess(MessageDataDTO message) {
|
||||
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
}
|
||||
|
||||
|
||||
@@ -94,7 +96,7 @@ public class RealTimeDataConsumer extends EnhanceConsumerMessageHandler<MessageD
|
||||
*/
|
||||
@Override
|
||||
protected void saveExceptionMsgLog(MessageDataDTO message, String identity, Exception exception) {
|
||||
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.REAL_TIME_DATA.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||
|
||||
@@ -29,9 +29,9 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
// 用户做自己的业务处理即可,注意message.toString()可以获取失效的key
|
||||
String expiredKey = message.toString();
|
||||
if(expiredKey.contains("AskRealData:")){
|
||||
if(expiredKey.contains(RedisKeyPrefix.ASK_REAL_DATA)){
|
||||
log.info("失效expiredKey:{}", expiredKey);
|
||||
String lineId = expiredKey.replace("AskRealData:","");
|
||||
String lineId = expiredKey.replace(RedisKeyPrefix.ASK_REAL_DATA,"");
|
||||
WebSocketServer.sendInfo("前置连接存在问题",lineId);
|
||||
redisUtil.delete(expiredKey);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.njcn.message.redis;
|
||||
|
||||
/**
|
||||
* Description:
|
||||
* Date: 2025/03/12 下午 4:23【需求编号】
|
||||
*前缀
|
||||
* @author clam
|
||||
* @version V1.0.0
|
||||
*/
|
||||
public interface RedisKeyPrefix {
|
||||
|
||||
String REAL_TIME_DATA = "REAL_TIME_DATA:";
|
||||
String HARMMONIC_TOPIC = "HARMMONIC_TOPIC:";
|
||||
|
||||
String DEVICE_RUN_FLAG = "DEVICE_RUN_FLAG:";
|
||||
|
||||
String ASK_REAL_DATA = "ASK_REAL_DATA:";
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
|
||||
|
||||
import com.njcn.message.message.AskRealDataMessage;
|
||||
import com.njcn.message.produce.template.AskRealDataMessaggeTemplate;
|
||||
import com.njcn.message.redis.RedisKeyPrefix;
|
||||
import com.njcn.middle.rocket.domain.BaseMessage;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -128,7 +129,7 @@ public class WebSocketServer {
|
||||
baseMessage.setMessageBody(JSONObject.toJSONString(askRealDataMessage));
|
||||
// 发送消息到topic1
|
||||
askRealDataMessaggeTemplate.sendMember(baseMessage);
|
||||
redisUtil.saveByKeyWithExpire("AskRealData:".concat(split[1]),"AskRealData:".concat(split[1]),20L);
|
||||
redisUtil.saveByKeyWithExpire(RedisKeyPrefix.ASK_REAL_DATA.concat(split[1]),RedisKeyPrefix.ASK_REAL_DATA.concat(split[1]),20L);
|
||||
log.info("监测点连接:" + userIdAndLineIdAndDevId + ",当前在线监测点数为:" + getOnlineCount());
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user