新增云协议相关内容
This commit is contained in:
@@ -1,9 +1,16 @@
|
||||
package com.njcn.zlevent.api;
|
||||
|
||||
import com.njcn.common.pojo.annotation.OperateInfo;
|
||||
import com.njcn.common.pojo.constant.ServerInfo;
|
||||
import com.njcn.common.pojo.enums.common.LogEnum;
|
||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.common.utils.HttpResultUtil;
|
||||
import com.njcn.mq.message.AppEventMessage;
|
||||
import com.njcn.mq.message.CldLogMessage;
|
||||
import com.njcn.zlevent.api.fallback.EventClientFallbackFactory;
|
||||
import io.swagger.annotations.ApiImplicitParam;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
@@ -20,4 +27,7 @@ public interface EventFeignClient {
|
||||
@PostMapping("/portableData")
|
||||
HttpResult<String> getPortableData(@RequestBody AppEventMessage appEventMessage);
|
||||
|
||||
@PostMapping("/cldEventData")
|
||||
HttpResult<String> getCldEventData(@RequestBody CldLogMessage cldLogMessage);
|
||||
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.mq.message.AppEventMessage;
|
||||
import com.njcn.mq.message.CldLogMessage;
|
||||
import com.njcn.zlevent.api.EventFeignClient;
|
||||
import feign.hystrix.FallbackFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -37,6 +38,12 @@ public class EventClientFallbackFactory implements FallbackFactory<EventFeignCli
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResult<String> getCldEventData(CldLogMessage cldLogMessage) {
|
||||
log.error("{}异常,降级处理,异常为:{}","云前置事件处理",cause.toString());
|
||||
throw new BusinessException(finalExceptionEnum);
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.common.utils.HttpResultUtil;
|
||||
import com.njcn.mq.message.AppEventMessage;
|
||||
import com.njcn.mq.message.CldLogMessage;
|
||||
import com.njcn.web.controller.BaseController;
|
||||
import com.njcn.zlevent.service.IEventService;
|
||||
import io.swagger.annotations.Api;
|
||||
@@ -54,4 +55,14 @@ public class EventController extends BaseController {
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||
@PostMapping("/cldEventData")
|
||||
@ApiOperation("云前置事件处理")
|
||||
@ApiImplicitParam(name = "cldLogMessage", value = "数据实体", required = true)
|
||||
public HttpResult<String> getCldEventData(@RequestBody CldLogMessage cldLogMessage){
|
||||
String methodDescribe = getMethodDescribe("getCldEventData");
|
||||
eventService.getCldEventData(cldLogMessage);
|
||||
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.njcn.zlevent.service;
|
||||
|
||||
import com.njcn.mq.message.AppEventMessage;
|
||||
import com.njcn.mq.message.CldLogMessage;
|
||||
|
||||
/**
|
||||
* @author xy
|
||||
@@ -25,4 +26,11 @@ public interface IEventService {
|
||||
*/
|
||||
void getPortableData(AppEventMessage appEventMessage);
|
||||
|
||||
/**
|
||||
* 云前置设备基础数据
|
||||
* 1.装置发起数据记录开始动作,库中新增数据;
|
||||
* @param cldLogMessage
|
||||
*/
|
||||
void getCldEventData( CldLogMessage cldLogMessage);
|
||||
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import com.njcn.csharmonic.pojo.po.CsEventPO;
|
||||
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
|
||||
import com.njcn.influx.utils.InfluxDbUtils;
|
||||
import com.njcn.mq.message.AppEventMessage;
|
||||
import com.njcn.mq.message.CldLogMessage;
|
||||
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.stat.enums.StatResponseEnum;
|
||||
@@ -258,6 +259,36 @@ public class EventServiceImpl implements IEventService {
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getCldEventData(CldLogMessage cldLogMessage) {
|
||||
CsEventPO po = new CsEventPO();
|
||||
po.setStartTime(LocalDateTime.parse(cldLogMessage.getTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
||||
po.setTag(cldLogMessage.getLog());
|
||||
po.setClDid(1);
|
||||
po.setLevel(3);
|
||||
po.setProcess(4);
|
||||
po.setCode(cldLogMessage.getCode());
|
||||
//前置告警
|
||||
if (Objects.equals(cldLogMessage.getLevel(),"process")) {
|
||||
//这边将前置服务器id当作设备id
|
||||
po.setDeviceId(cldLogMessage.getNodeId());
|
||||
po.setClDid(Integer.valueOf(cldLogMessage.getProcessNo()));
|
||||
po.setType(4);
|
||||
}
|
||||
//设备和监测点告警
|
||||
else {
|
||||
if (Objects.equals(cldLogMessage.getLevel(),"terminal")) {
|
||||
po.setDeviceId(cldLogMessage.getBusinessId());
|
||||
} else {
|
||||
CsLinePO line = csLineFeignClient.getById(cldLogMessage.getBusinessId()).getData();
|
||||
po.setDeviceId(line.getDeviceId());
|
||||
po.setLineId(cldLogMessage.getBusinessId());
|
||||
}
|
||||
po.setType(3);
|
||||
}
|
||||
csEventService.save(po);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理电压
|
||||
* @param vol
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
package com.njcn.message.consumer;
|
||||
|
||||
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||
import com.njcn.mq.constant.BusinessTopic;
|
||||
import com.njcn.mq.constant.MessageStatus;
|
||||
import com.njcn.mq.message.CldLogMessage;
|
||||
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||
import com.njcn.zlevent.api.EventFeignClient;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 类的介绍: 云前置心跳消息
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2025/9/16
|
||||
*/
|
||||
@Service
|
||||
@RocketMQMessageListener(
|
||||
topic = BusinessTopic.LOG_TOPIC,
|
||||
consumerGroup = BusinessTopic.LOG_TOPIC,
|
||||
consumeThreadNumber = 10,
|
||||
enableMsgTrace = true
|
||||
)
|
||||
@Slf4j
|
||||
public class CldEventConsumer extends EnhanceConsumerMessageHandler<CldLogMessage> implements RocketMQListener<CldLogMessage> {
|
||||
|
||||
@Resource
|
||||
private RedisUtil redisUtil;
|
||||
@Resource
|
||||
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||
@Resource
|
||||
private EventFeignClient eventFeignClient;
|
||||
|
||||
@Override
|
||||
protected void handleMessage(CldLogMessage cldLogMessage) {
|
||||
log.info("分发至云前置告警事件处理");
|
||||
eventFeignClient.getCldEventData(cldLogMessage);
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 通过redis分布式锁判断当前消息所处状态
|
||||
* 1、null 查不到该key的数据,属于第一次消费,放行
|
||||
* 2、fail 上次消息消费时发生异常,放行
|
||||
* 3、being processed 正在处理,打回去
|
||||
* 4、success 最近72小时消费成功,避免重复消费,打回去
|
||||
*/
|
||||
@Override
|
||||
public boolean filter(CldLogMessage message) {
|
||||
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
|
||||
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 消费成功,缓存到redis72小时,避免重复消费
|
||||
*/
|
||||
@Override
|
||||
protected void consumeSuccess(CldLogMessage message) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发生异常时,进行错误信息入库保存
|
||||
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||
*/
|
||||
@Override
|
||||
protected void saveExceptionMsgLog(CldLogMessage message, String identity, Exception exception) {
|
||||
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
|
||||
//数据库字段配置长度200,避免插入失败,大致分析异常原因
|
||||
String exceptionMsg = exception.getMessage();
|
||||
if(exceptionMsg.length() > 200){
|
||||
exceptionMsg = exceptionMsg.substring(0,180);
|
||||
}
|
||||
rocketmqMsgErrorLog.setRecord(exceptionMsg);
|
||||
//如果是当前消息重试的则略过
|
||||
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
|
||||
//单次消费异常
|
||||
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||
}
|
||||
} else {
|
||||
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
|
||||
//重试N次后,依然消费异常
|
||||
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 处理失败后,是否重试
|
||||
* 一般开启
|
||||
*/
|
||||
@Override
|
||||
protected boolean isRetry() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 消费失败是否抛出异常,抛出异常后就不再消费了
|
||||
*/
|
||||
@Override
|
||||
protected boolean throwException() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 调用父类handler处理消息的元信息
|
||||
*/
|
||||
@Override
|
||||
public void onMessage(CldLogMessage cldLogMessage) {
|
||||
super.dispatchMessage(cldLogMessage);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user