diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java index 755432a..d553e1d 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/DeviceRunFlagDataConsumer.java @@ -33,7 +33,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Device_Run_Flag_Topic", consumerGroup = "Device_Run_Flag_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java index a60a78c..335b9b1 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontDataConsumer.java @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit; @RocketMQMessageListener( topic = "LN_Topic", consumerGroup = "ln_consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java index 3b8724b..0fe447f 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/FrontHeartBeatConsumer.java @@ -33,7 +33,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Heart_Beat_Topic", consumerGroup = "Heartb_Beat_Topic_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java index a6036fe..2d09140 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/RealTimeDataConsumer.java @@ -31,7 +31,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Real_Time_Data_Topic", consumerGroup = "real_time_consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java index 54194f0..560d6b7 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicLogsConsumer.java @@ -33,7 +33,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "log_Topic", consumerGroup = "Log_Topic_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java index 7e3715b..f01c0d9 100644 --- a/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java +++ b/message/message-boot/src/main/java/com/njcn/message/consumer/TopicReplyConsumer.java @@ -32,7 +32,7 @@ import java.util.Objects; @RocketMQMessageListener( topic = "Topic_Reply_Topic", consumerGroup = "Topic_Reply_Topic_Consumer", - selectorExpression = "Test_Tag||Test_Keys", + selectorExpression = "*", consumeThreadNumber = 10, enableMsgTrace = true ) diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java index 893330d..18745af 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/AskRealDataMessaggeTemplate.java @@ -1,8 +1,10 @@ package com.njcn.message.produce.template; +import com.alibaba.fastjson.JSON; import com.njcn.message.constant.BusinessResource; import com.njcn.message.constant.BusinessTopic; +import com.njcn.message.message.AskRealDataMessage; import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; import org.apache.rocketmq.client.producer.SendResult; @@ -24,7 +26,8 @@ public class AskRealDataMessaggeTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage askRealDataMessage,String nodeId) { askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE); - askRealDataMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage); + AskRealDataMessage dto = JSON.parseObject(askRealDataMessage.getMessageBody(), AskRealDataMessage.class); + askRealDataMessage.setKey(dto.getLine()); + return send(BusinessTopic.ASK_REAL_DATA_TOPIC,nodeId , askRealDataMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java index f0dd9c1..d5b9338 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/DeviceRebootMessageTemplate.java @@ -1,7 +1,10 @@ package com.njcn.message.produce.template; +import com.alibaba.fastjson.JSON; import com.njcn.message.constant.BusinessResource; import com.njcn.message.constant.BusinessTopic; +import com.njcn.message.message.AskRealDataMessage; +import com.njcn.message.message.DeviceRebootMessage; import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; import org.apache.rocketmq.client.producer.SendResult; @@ -24,7 +27,6 @@ public class DeviceRebootMessageTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage baseMessage,String nodeId) { baseMessage.setSource(BusinessResource.WEB_RESOURCE); - baseMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage); + return send(BusinessTopic.CONTROL_TOPIC,nodeId, baseMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java index 155c510..8645108 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/ProcessRebootMessageTemplate.java @@ -1,7 +1,10 @@ package com.njcn.message.produce.template; +import com.alibaba.fastjson.JSON; import com.njcn.message.constant.BusinessResource; import com.njcn.message.constant.BusinessTopic; +import com.njcn.message.message.AskRealDataMessage; +import com.njcn.message.message.ProcessRebootMessage; import com.njcn.middle.rocket.domain.BaseMessage; import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate; import org.apache.rocketmq.client.producer.SendResult; @@ -24,7 +27,8 @@ public class ProcessRebootMessageTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage baseMessage,String nodeId) { baseMessage.setSource(BusinessResource.WEB_RESOURCE); - baseMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.PROCESS_TOPIC,"Test_Tag", baseMessage); + ProcessRebootMessage dto = JSON.parseObject(baseMessage.getMessageBody(), ProcessRebootMessage.class); + baseMessage.setKey(dto.getIndex()+""); + return send(BusinessTopic.PROCESS_TOPIC,nodeId, baseMessage); } } diff --git a/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java b/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java index 34df5ee..f05d3d4 100644 --- a/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java +++ b/message/message-boot/src/main/java/com/njcn/message/produce/template/RecallMessaggeTemplate.java @@ -24,7 +24,6 @@ public class RecallMessaggeTemplate extends RocketMQEnhanceTemplate { public SendResult sendMember(BaseMessage recallMessage,String nodeId) { recallMessage.setSource(BusinessResource.WEB_RESOURCE); - recallMessage.setKey("Test_Keys"); - return send(nodeId+"_"+BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage); + return send(BusinessTopic.RECALL_TOPIC,nodeId , recallMessage); } }