Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 528f376f6d | |||
| 1d29a03a3c | |||
| 9ea6a00cb5 | |||
| c33490c4fc |
@@ -38,7 +38,6 @@ import lombok.AllArgsConstructor;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
@@ -328,7 +327,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
|||||||
param.setNDid(nDid);
|
param.setNDid(nDid);
|
||||||
param.setStatus(1);
|
param.setStatus(1);
|
||||||
param.setRunStatus(1);
|
param.setRunStatus(1);
|
||||||
|
boolean isConnectDev = DicDataEnum.CONNECT_DEV.getCode().equals(dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData().getCode());
|
||||||
|
if (isConnectDev) {
|
||||||
param.setProcess(2);
|
param.setProcess(2);
|
||||||
|
} else {
|
||||||
|
param.setProcess(4);
|
||||||
|
}
|
||||||
csEquipmentDeliveryService.devResetFactory(param);
|
csEquipmentDeliveryService.devResetFactory(param);
|
||||||
//清除关系表
|
//清除关系表
|
||||||
QueryWrapper<CsLedger> csLedgerQueryWrapper = new QueryWrapper<>();
|
QueryWrapper<CsLedger> csLedgerQueryWrapper = new QueryWrapper<>();
|
||||||
@@ -374,12 +378,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
|||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public String wlDevRegister(String nDid) {
|
public String wlDevRegister(String nDid) {
|
||||||
String result = "fail";
|
String result = "fail";
|
||||||
// 设备状态判断
|
|
||||||
checkDeviceStatus(nDid);
|
|
||||||
// 询问设备支持的主题信息,并将支持的主题入库
|
|
||||||
askAndStoreTopics(nDid);
|
|
||||||
// MQTT询问装置用的模板,并判断库中是否存在模板
|
|
||||||
checkDeviceModel(nDid);
|
|
||||||
// 根据模板接入设备
|
// 根据模板接入设备
|
||||||
DeviceLogDTO logDto = new DeviceLogDTO();
|
DeviceLogDTO logDto = new DeviceLogDTO();
|
||||||
logDto.setUserName(RequestUtil.getUserNickname());
|
logDto.setUserName(RequestUtil.getUserNickname());
|
||||||
@@ -387,9 +385,14 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
|||||||
logDto.setOperate("便携式设备"+nDid+"注册、接入");
|
logDto.setOperate("便携式设备"+nDid+"注册、接入");
|
||||||
logDto.setResult(1);
|
logDto.setResult(1);
|
||||||
try {
|
try {
|
||||||
|
// 设备状态判断
|
||||||
|
checkDeviceStatus(nDid);
|
||||||
|
// 询问设备支持的主题信息,并将支持的主题入库
|
||||||
|
askAndStoreTopics(nDid);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
// MQTT询问装置用的模板,并判断库中是否存在模板
|
||||||
|
checkDeviceModel(nDid);
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
//获取版本
|
|
||||||
String version = csTopicService.getVersion(nDid);
|
|
||||||
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
|
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
|
||||||
List<CsLinePO> csLinePoList = new ArrayList<>();
|
List<CsLinePO> csLinePoList = new ArrayList<>();
|
||||||
//1.录入装置台账信息
|
//1.录入装置台账信息
|
||||||
@@ -441,6 +444,9 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
|||||||
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
|
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
|
||||||
//5.发起自动接入请求
|
//5.发起自动接入请求
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
|
//先获取版本
|
||||||
|
// String version = csTopicService.getVersion(nDid);
|
||||||
|
String version = "V1";
|
||||||
devAccessAskTemplate(nDid,version,1);
|
devAccessAskTemplate(nDid,version,1);
|
||||||
//6.修改流程,便携式设备接入成功即为实际环境
|
//6.修改流程,便携式设备接入成功即为实际环境
|
||||||
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);
|
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.njcn.access.service.impl;
|
package com.njcn.access.service.impl;
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
@@ -18,7 +17,10 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -151,7 +153,6 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
|
|||||||
QueryWrapper<CsEquipmentDeliveryPO> wrapper = new QueryWrapper<>();
|
QueryWrapper<CsEquipmentDeliveryPO> wrapper = new QueryWrapper<>();
|
||||||
wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated");
|
wrapper.select("DISTINCT CONCAT(node_id, node_process) as concatenated");
|
||||||
wrapper.eq("usage_status", 1);
|
wrapper.eq("usage_status", 1);
|
||||||
wrapper.eq("run_status", 2);
|
|
||||||
wrapper.isNotNull("node_id");
|
wrapper.isNotNull("node_id");
|
||||||
return baseMapper.selectObjs(wrapper)
|
return baseMapper.selectObjs(wrapper)
|
||||||
.stream()
|
.stream()
|
||||||
|
|||||||
@@ -11,10 +11,10 @@ import lombok.AllArgsConstructor;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Arrays;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.Objects;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -33,12 +33,19 @@ public class CsEventServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
|
|||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public List<String> updateCsEvent(CsEventParam csEventParam) {
|
public List<String> updateCsEvent(CsEventParam csEventParam) {
|
||||||
List<String> eventList = new ArrayList<>();
|
List<String> eventList = new ArrayList<>();
|
||||||
|
|
||||||
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
|
||||||
|
LocalDateTime dateTime = LocalDateTime.parse(csEventParam.getStartTime(), formatter);
|
||||||
|
// 减去1毫秒
|
||||||
|
LocalDateTime newDateTime = dateTime.minusNanos(1000000);
|
||||||
|
String startTime = newDateTime.format(formatter);
|
||||||
|
|
||||||
//1.将波形文件关联事件
|
//1.将波形文件关联事件
|
||||||
LambdaUpdateWrapper<CsEventPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
|
LambdaUpdateWrapper<CsEventPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
|
||||||
lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId())
|
lambdaUpdateWrapper.set(CsEventPO::getWavePath,csEventParam.getPath()).eq(CsEventPO::getLineId,csEventParam.getLineId())
|
||||||
.eq(CsEventPO::getDeviceId,csEventParam.getDeviceId())
|
.eq(CsEventPO::getDeviceId,csEventParam.getDeviceId())
|
||||||
.in(CsEventPO::getType, Arrays.asList(0,1))
|
.in(CsEventPO::getType, Arrays.asList(0,1))
|
||||||
.between(CsEventPO::getStartTime,csEventParam.getStartTime(),csEventParam.getEndTime());
|
.between(CsEventPO::getStartTime,startTime,csEventParam.getEndTime());
|
||||||
if (Objects.nonNull(csEventParam.getLocation())) {
|
if (Objects.nonNull(csEventParam.getLocation())) {
|
||||||
lambdaUpdateWrapper.eq(CsEventPO::getLocation, csEventParam.getLocation());
|
lambdaUpdateWrapper.eq(CsEventPO::getLocation, csEventParam.getLocation());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,148 @@
|
|||||||
|
package com.njcn.message.consumer;
|
||||||
|
|
||||||
|
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
|
||||||
|
import com.njcn.csdevice.param.IcdBzReplyParam;
|
||||||
|
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||||
|
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||||
|
import com.njcn.mq.constant.BusinessTopic;
|
||||||
|
import com.njcn.mq.constant.MessageStatus;
|
||||||
|
import com.njcn.mq.message.BzMessage;
|
||||||
|
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||||
|
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||||
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 类的介绍:接收前置响应台账更新相关信息
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @createTime 2023/8/11 15:32
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@RocketMQMessageListener(
|
||||||
|
topic = BusinessTopic.REPLY_TOPIC,
|
||||||
|
consumerGroup = "RECALL",
|
||||||
|
selectorExpression = "RECALL",
|
||||||
|
consumeMode = ConsumeMode.ORDERLY,
|
||||||
|
consumeThreadNumber = 10,
|
||||||
|
enableMsgTrace = true
|
||||||
|
)
|
||||||
|
@Slf4j
|
||||||
|
public class BzConsumer extends EnhanceConsumerMessageHandler<BzMessage> implements RocketMQListener<BzMessage> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisUtil redisUtil;
|
||||||
|
@Resource
|
||||||
|
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||||
|
@Resource
|
||||||
|
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(BzMessage message) {
|
||||||
|
log.info("分发至补召响应处理程序");
|
||||||
|
//收到消息修改(cs_terminal_reply)
|
||||||
|
IcdBzReplyParam param = new IcdBzReplyParam();
|
||||||
|
param.setId(message.getGuid());
|
||||||
|
param.setDeviceId(message.getTerminalId());
|
||||||
|
param.setLineId(message.getMonitorId());
|
||||||
|
param.setCode(message.getCode());
|
||||||
|
param.setMsg(message.getResult());
|
||||||
|
if (param.getCode() == 200) {
|
||||||
|
param.setState(1);
|
||||||
|
} else {
|
||||||
|
param.setState(2);
|
||||||
|
}
|
||||||
|
csTerminalReplyFeignClient.updateBzData(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 通过redis分布式锁判断当前消息所处状态
|
||||||
|
* 1、null 查不到该key的数据,属于第一次消费,放行
|
||||||
|
* 2、fail 上次消息消费时发生异常,放行
|
||||||
|
* 3、being processed 正在处理,打回去
|
||||||
|
* 4、success 最近72小时消费成功,避免重复消费,打回去
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean filter(BzMessage message) {
|
||||||
|
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
|
||||||
|
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||||
|
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消费成功,缓存到redis72小时,避免重复消费
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void consumeSuccess(BzMessage message) {
|
||||||
|
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发生异常时,进行错误信息入库保存
|
||||||
|
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void saveExceptionMsgLog(BzMessage message, String identity, Exception exception) {
|
||||||
|
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
|
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||||
|
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||||
|
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||||
|
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
|
||||||
|
//数据库字段配置长度200,避免插入失败,大致分析异常原因
|
||||||
|
String exceptionMsg = exception.getMessage();
|
||||||
|
if(exceptionMsg.length() > 200){
|
||||||
|
exceptionMsg = exceptionMsg.substring(0,180);
|
||||||
|
}
|
||||||
|
rocketmqMsgErrorLog.setRecord(exceptionMsg);
|
||||||
|
//如果是当前消息重试的则略过
|
||||||
|
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
|
||||||
|
//单次消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
|
||||||
|
//重试N次后,依然消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 处理失败后,是否重试
|
||||||
|
* 一般开启
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean isRetry() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 消费失败是否抛出异常,抛出异常后就不再消费了
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean throwException() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 调用父类handler处理消息的元信息
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onMessage(BzMessage message) {
|
||||||
|
super.dispatchMessage(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -12,6 +12,7 @@ import com.njcn.redis.utils.RedisUtil;
|
|||||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@@ -31,6 +32,7 @@ import java.util.Objects;
|
|||||||
topic = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
|
topic = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
|
||||||
consumerGroup = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
|
consumerGroup = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
|
consumeMode = ConsumeMode.ORDERLY,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import com.njcn.system.api.RocketMqLogFeignClient;
|
|||||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
import com.njcn.zlevent.api.EventFeignClient;
|
import com.njcn.zlevent.api.EventFeignClient;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@@ -31,6 +32,7 @@ import java.util.Objects;
|
|||||||
topic = BusinessTopic.LOG_TOPIC,
|
topic = BusinessTopic.LOG_TOPIC,
|
||||||
consumerGroup = BusinessTopic.LOG_TOPIC,
|
consumerGroup = BusinessTopic.LOG_TOPIC,
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
|
consumeMode = ConsumeMode.ORDERLY,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import com.njcn.redis.utils.RedisUtil;
|
|||||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@@ -30,6 +31,7 @@ import java.util.Objects;
|
|||||||
topic = BusinessTopic.HEART_BEAT_TOPIC,
|
topic = BusinessTopic.HEART_BEAT_TOPIC,
|
||||||
consumerGroup = BusinessTopic.HEART_BEAT_TOPIC,
|
consumerGroup = BusinessTopic.HEART_BEAT_TOPIC,
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
|
consumeMode = ConsumeMode.ORDERLY,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.njcn.message.consumer;
|
|||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
|
import com.njcn.csdevice.api.CsTerminalReplyFeignClient;
|
||||||
|
import com.njcn.csdevice.param.IcdBzReplyParam;
|
||||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||||
import com.njcn.mq.constant.BusinessTopic;
|
import com.njcn.mq.constant.BusinessTopic;
|
||||||
@@ -13,10 +14,10 @@ import com.njcn.redis.utils.RedisUtil;
|
|||||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -32,7 +33,9 @@ import java.util.Objects;
|
|||||||
@Service
|
@Service
|
||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
topic = BusinessTopic.REPLY_TOPIC,
|
topic = BusinessTopic.REPLY_TOPIC,
|
||||||
consumerGroup = BusinessTopic.REPLY_TOPIC,
|
consumerGroup = "LEDGER",
|
||||||
|
selectorExpression = "LEDGER",
|
||||||
|
consumeMode = ConsumeMode.ORDERLY,
|
||||||
consumeThreadNumber = 10,
|
consumeThreadNumber = 10,
|
||||||
enableMsgTrace = true
|
enableMsgTrace = true
|
||||||
)
|
)
|
||||||
@@ -47,18 +50,23 @@ public class UpdateLedgerConsumer extends EnhanceConsumerMessageHandler<UpdateLe
|
|||||||
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
|
private CsTerminalReplyFeignClient csTerminalReplyFeignClient;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public void handleMessage(UpdateLedgerMessage updateLedgerMessage) {
|
public void handleMessage(UpdateLedgerMessage updateLedgerMessage) {
|
||||||
log.info("分发至更新台账响应处理程序");
|
log.info("分发至更新台账响应处理程序");
|
||||||
//收到消息修改(cs_terminal_reply)
|
//收到消息修改(cs_terminal_reply)
|
||||||
List<UpdateLedgerMessage.HandleData> data = updateLedgerMessage.getData();
|
List<UpdateLedgerMessage.HandleData> data = updateLedgerMessage.getData();
|
||||||
if (ObjectUtil.isNotEmpty(data)) {
|
if (ObjectUtil.isNotEmpty(data)) {
|
||||||
data.forEach(item->{
|
data.forEach(item->{
|
||||||
|
IcdBzReplyParam param = new IcdBzReplyParam();
|
||||||
|
param.setId(updateLedgerMessage.getGuid());
|
||||||
|
param.setDeviceId(item.getDeviceId());
|
||||||
|
param.setCode(item.getCode());
|
||||||
|
param.setMsg(item.getResult());
|
||||||
if (item.getCode() == 200) {
|
if (item.getCode() == 200) {
|
||||||
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),1,item.getDeviceId());
|
param.setState(1);
|
||||||
} else {
|
} else {
|
||||||
csTerminalReplyFeignClient.updateData(updateLedgerMessage.getGuid(),2,item.getDeviceId());
|
param.setState(2);
|
||||||
}
|
}
|
||||||
|
csTerminalReplyFeignClient.updateData(param);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user