配置消息消费size

This commit is contained in:
hzj
2025-04-11 13:39:08 +08:00
parent bfd03b4950
commit 3165f25d73

View File

@@ -16,8 +16,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -45,7 +47,8 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
@Autowired @Autowired
private MessAnalysisFeignClient messAnalysisFeignClient; private MessAnalysisFeignClient messAnalysisFeignClient;
@Value("${rocketmq.consumer_size}")
private Integer consumerSize;
@Resource @Resource
private RedisUtil redisUtil; private RedisUtil redisUtil;
@@ -54,8 +57,14 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
private RocketMqLogFeignClient rocketMqLogFeignClient; private RocketMqLogFeignClient rocketMqLogFeignClient;
private List<MessageDataDTO> messageList = new ArrayList<>(1); private List<MessageDataDTO> messageList = new ArrayList<>();
@PostConstruct
public void validateConfig() {
if (consumerSize == null) {
throw new IllegalStateException("rocketmq.consumer_size 未配置!");
}
this.messageList = new ArrayList<>(consumerSize);
}
@Override @Override
public void onMessage(String baseMessage) { public void onMessage(String baseMessage) {
MessageDataDTO messageDataDTO = JSONObject.parseObject(baseMessage,MessageDataDTO.class); MessageDataDTO messageDataDTO = JSONObject.parseObject(baseMessage,MessageDataDTO.class);
@@ -92,7 +101,7 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
protected void handleMessage(MessageDataDTO message) { protected void handleMessage(MessageDataDTO message) {
synchronized (messageList) { synchronized (messageList) {
messageList.add(message); messageList.add(message);
if (messageList.size() >= 1) { if (messageList.size() >= consumerSize) {
saveToDatabase(); saveToDatabase();
} }
} }
@@ -159,7 +168,7 @@ public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageData
messAnalysisFeignClient.analysis(messageList); messAnalysisFeignClient.analysis(messageList);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
log.info("处理120条消息所需时间------------"+(end-start)); log.info("处理"+consumerSize+"条消息所需时间------------"+(end-start));
}catch (Exception e){{ }catch (Exception e){{
log.info(e.toString()); log.info(e.toString());
} }