初始版本提交

This commit is contained in:
hzj
2025-01-17 13:35:20 +08:00
parent 862c6ee672
commit 92f98231f2
59 changed files with 11344 additions and 2 deletions

View File

@@ -17,5 +17,22 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-db</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-microservice</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,30 @@
package com.njcn.message.api;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.message.api.fallback.ProduceFeignClientFallbackFactory;
import com.njcn.message.message.DeviceRebootMessage;
import com.njcn.message.message.RecallMessage;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* Description:
* Date: 2025/01/15 下午 3:52【需求编号】
*
* @author clam
* @version V1.0.0
*/
@FeignClient(
value = "message-boot",
path = "/produce",
fallbackFactory = ProduceFeignClientFallbackFactory.class,
contextId = "message")
public interface ProduceFeignClient {
@PostMapping("/recall")
public HttpResult<Boolean> recall(@RequestBody RecallMessage message);
@PostMapping("/askRestartDevice")
public HttpResult<Boolean> askRestartDevice(@RequestBody DeviceRebootMessage message);
}

View File

@@ -0,0 +1,51 @@
package com.njcn.message.api.fallback;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.message.api.ProduceFeignClient;
import com.njcn.message.message.DeviceRebootMessage;
import com.njcn.message.message.RecallMessage;
import com.njcn.message.utils.MessageEnumUtil;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* Description:
* Date: 2025/01/15 下午 3:55【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@Component
public class ProduceFeignClientFallbackFactory implements FallbackFactory<ProduceFeignClient> {
@Override
public ProduceFeignClient create(Throwable throwable) {
//判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if (throwable.getCause() instanceof BusinessException) {
BusinessException businessException = (BusinessException) throwable.getCause();
exceptionEnum = MessageEnumUtil.getExceptionEnum(businessException.getResult());
}
Enum<?> finalExceptionEnum = exceptionEnum;
return new ProduceFeignClient() {
@Override
public HttpResult<Boolean> recall(RecallMessage message) {
log.error("{}异常,降级处理,异常为:{}", "数据补招消息推送", throwable.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<Boolean> askRestartDevice(DeviceRebootMessage message) {
log.error("{}异常,降级处理,异常为:{}", "消息数据解析", throwable.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -0,0 +1,18 @@
package com.njcn.message.constant;
/**
* @author xy
*/
public interface BusinessResource {
/***
* App
*/
String APP_RESOURCE = "APP";
/***
* Web
*/
String WEB_RESOURCE = "WEB";
}

View File

@@ -0,0 +1,88 @@
package com.njcn.message.constant;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月10日 15:30
*/
public interface BusinessTopic {
/**
* 治理主送推送数据接收主题
*/
String NJCN_APP_AUTO_DATA_TOPIC = "njcnAppAutoDataTopic";
/**
* 治理事件接收主题
*/
String NJCN_APP_EVENT_TOPIC = "njcnAppEventTopic";
/**
* 治理告警接收主题
*/
String NJCN_APP_WARN_TOPIC = "njcnAppWarnTopic";
/**
* 治理文件接收主题
*/
String NJCN_APP_FILE_TOPIC = "njcnAppFileTopic";
String NJCN_APP_FILE_STREAM_TOPIC = "njcnAppFileStreamTopic";
String CONTROL_TOPIC = "control_Topic";
String ASK_REAL_DATA_TOPIC = "ask_real_data_topic";
String RECALL_TOPIC = "recall_Topic";
/********************************数据中心*********************************/
String RMP_EVENT_DETAIL_TOPIC = "rmpEventDetailTopic";
interface AppDataTag {
/**
* 统计数据tag
*/
String STAT_TAG = "stat";
/**
* 实时数据tag
*/
String RT_TAG = "rt";
}
interface AppEventTag {
/**
* 暂态事件(录波)tag
*/
String EVENT_TAG = "event";
}
interface AppWarnTag {
/**
* 告警事件tag
*/
String WARN_TAG = "warn";
}
interface FileTag {
/**
* 文件信息tag
*/
String INFO_TAG = "fileInfo";
/**
* 文件流tag
*/
String STREAM_TAG = "streamInfo";
}
}

View File

@@ -0,0 +1,31 @@
package com.njcn.message.constant;
/**
* Description:
* Date: 2024/12/17 10:04【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface DeviceRebootType {
/***
* 1. 新增终端台账
*/
String ADD_TERMINAL = "add_terminal";
/***
* 1. 删除终端台账
*/
String DELETE_TERMINAL = "delete_terminal";
/***
*修改终端 新增/删除/监测点台账
*/
String LEDGER_MODIFY = "ledger_modify";
/**
* Icd变更
*/
String ICD_CHANGE = "icd_change";
}

View File

@@ -0,0 +1,18 @@
package com.njcn.message.constant;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月18日 11:31
*/
public interface MessageStatus {
/***
* 状态为 成功、处理中时,打回不处理
* 状态为 失败,可以继续处理
*/
String SUCCESS = "success";
String BEING_PROCESSED = "being processed";
String FAIL = "fail";
}

View File

@@ -0,0 +1,31 @@
package com.njcn.message.enums;
import lombok.Getter;
/**
* Description:
* Date: 2024/11/13 15:24【需求编号】
*01:稳态02:短时闪变03:长时闪变04:暂态05:告警
* @author clam
* @version V1.0.0
*/
@Getter
public enum DataTypeEnum {
HARMONIC(1, "稳态"),
FLUC(2, "短时闪变"),
PLT(3, "长时闪变"),
EVENT(4, "暂态"),
ALARM(5, "告警"),;
private final Integer code;
private final String message;
DataTypeEnum(Integer code, String message) {
this.code = code;
this.message = message;
}
}

View File

@@ -0,0 +1,23 @@
package com.njcn.message.enums;
import lombok.Getter;
/**
* @author hongawen
* @version 1.0.0
* @date 2021年12月20日 09:56
*/
@Getter
public enum MessageResponseEnum {
MESSAGE_COMMON_ERROR("A00550","消息模块异常"),
;
private final String code;
private final String message;
MessageResponseEnum(String code, String message) {
this.code = code;
this.message = message;
}
}

View File

@@ -0,0 +1,20 @@
package com.njcn.message.message;
import lombok.Data;
/**
* Description:
* Date: 2024/12/13 10:15【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class AskRealDataMessage {
private String devSeries;
private String line;
private Boolean realData = true;
private Boolean soeData = true;
//默认要60s数据3s*20所以这里默认20
private Integer limit = 20;
}

View File

@@ -0,0 +1,67 @@
package com.njcn.message.message;
import lombok.Data;
import java.util.List;
/**
* Description:
* Date: 2024/12/13 10:15【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class DeviceRebootMessage {
private String code;
private DeviceInfo data;
@Data
public static class DeviceInfo {
//终端索引
private String id;
//终端ip
private String ip;
//终端型号
private String devType;
//挂载单位
private String org_name;
//组织名称
private String port;
//终端端口
private String stationName;
//变电站名
private String name;
//终端序列号
private String updateTime;
//数据更新时间
private String manufacturer;
//终端厂商
private String status;
//终端状态
private String series;
//终端识别码
private String devKey;
//
private List<MonitorInfo> monitorData;
@Data
public static class MonitorInfo {
//监测点索引
private String id;
//名称
private String name;
//监测点逻辑号
private String lineNo;
//监测点电压等级
private String voltageLevel;
//监测点接线方式
private String ptType;
private String status;
}
}
}

View File

@@ -0,0 +1,34 @@
package com.njcn.message.message;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
/**
* Description:
* Date: 2024/11/7 15:05【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class RecallMessage {
private List<RecallDTO> data;
@Data
public static class RecallDTO {
/**
* monitorId : ["id1","id2","id3"]
* dataType : 稳态
* timeInterval : ["2024-01-01 12:52:51~2024-01-01 13:53:00","2024-01-02 12:52:51~2024-01-02 13:53:00","2024-01-03 12:52:51~2024-01-03 13:53:00","2024-01-04 12:52:51~2024-01-04 13:53:00"]
*/
@ApiModelProperty("0/1稳态/暂态)")
private String dataType;
private List<String> monitorId;
private List<String> timeInterval;
}
}

View File

@@ -0,0 +1,21 @@
package com.njcn.message.messagedto;
import com.njcn.middle.rocket.domain.BaseMessage;
import lombok.Data;
import java.io.Serializable;
/**
* Description:
* Date: 2024/11/8 14:09【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class MessageDataDTO extends BaseMessage implements Serializable {
private Integer dataType;
private String monitor;
private String value;
}

View File

@@ -0,0 +1,46 @@
package com.njcn.message.utils;
import cn.hutool.core.util.StrUtil;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.EnumUtils;
import com.njcn.message.enums.MessageResponseEnum;
import javax.validation.constraints.NotNull;
import java.util.Objects;
/**
* @author hongawen
* @version 1.0.0
* @date 2021年12月20日 10:03
*/
public class MessageEnumUtil {
/**
* 获取HarmonicResponseEnum实例
*/
public static MessageResponseEnum getMessageEnumResponseEnumByMessage(@NotNull Object value) {
MessageResponseEnum harmonicResponseEnum;
try {
String message = value.toString();
if(message.indexOf(StrUtil.C_COMMA)>0){
value = message.substring(message.indexOf(StrUtil.C_COMMA)+1);
}
harmonicResponseEnum = EnumUtils.valueOf(MessageResponseEnum.class, value, MessageResponseEnum.class.getMethod(BusinessException.GET_MESSAGE_METHOD));
return Objects.isNull(harmonicResponseEnum) ? MessageResponseEnum.MESSAGE_COMMON_ERROR : harmonicResponseEnum;
} catch (NoSuchMethodException e) {
throw new BusinessException(CommonResponseEnum.INTERNAL_ERROR);
}
}
public static Enum<?> getExceptionEnum(HttpResult<Object> result){
//如果返回错误,且为内部错误,则直接抛出异常
CommonResponseEnum commonResponseEnum = EnumUtils.getCommonResponseEnumByCode(result.getCode());
if (commonResponseEnum == CommonResponseEnum.DEVICE_RESPONSE_ENUM) {
return getMessageEnumResponseEnumByMessage(result.getMessage());
}
return commonResponseEnum;
}
}

View File

@@ -0,0 +1,38 @@
package com.njcn.middle.rocket.domain;
import cn.hutool.core.util.IdUtil;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author hongawen
* @version 1.0.0
* @date 2023年08月04日 10:53
*/
@Data
public class BaseMessage {
/**
* 业务键用于RocketMQ控制台查看消费情况
* 为了保证唯一性使用雪花算法生成一个随机id
*/
protected String key = IdUtil.getSnowflake().nextIdStr();
/**
* 发送消息来源,用于排查问题
*/
protected String source = "";
/**
* 发送时间
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 重试次数,用于判断重试次数,超过重试次数发送异常警告
*/
protected Integer retryTimes = 0;
protected String messageBoy;
}

View File

@@ -38,6 +38,74 @@
<artifactId>mybatis-spring</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-swagger</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-db</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-redis</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>rocket-mq-springboot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.njcn.platform</groupId>
<artifactId>message-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn.platform</groupId>
<artifactId>stat-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn.platform</groupId>
<artifactId>event-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn.platform</groupId>
<artifactId>rt-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>pq-device-api</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<artifactId>pqs-influx</artifactId>
<groupId>com.njcn</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.12</version>
</dependency>
</dependencies>
<build>

View File

@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.DependsOn;
/**
@@ -12,6 +13,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
@Slf4j
@EnableFeignClients(basePackages = "com.njcn")
@SpringBootApplication(scanBasePackages = "com.njcn")
@DependsOn("proxyMapperRegister")
public class MessageBootApplication {
public static void main(String[] args) {

View File

@@ -0,0 +1,174 @@
package com.njcn.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.api.MessAnalysisFeignClient;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:32
*/
@Component
@RocketMQMessageListener(
topic = "LN_Topic",
consumerGroup = "ln_consumer",
selectorExpression = "Test_Tag||Test_Keys",
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class FrontDataConsumer extends EnhanceConsumerMessageHandler<MessageDataDTO> implements RocketMQListener<String> {
@Autowired
private MessAnalysisFeignClient messAnalysisFeignClient;
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
private List<MessageDataDTO> messageList = new ArrayList<>(120);
@Override
public void onMessage(String baseMessage) {
MessageDataDTO messageDataDTO = JSONObject.parseObject(baseMessage,MessageDataDTO.class);
super.dispatchMessage(messageDataDTO);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(MessageDataDTO message) {
String keyStatus = redisUtil.getStringByKey(message.getKey());
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(MessageDataDTO message) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
@Override
protected void handleMessage(MessageDataDTO message) {
synchronized (messageList) {
messageList.add(message);
if (messageList.size() >= 120) {
saveToDatabase();
}
}
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(MessageDataDTO message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return false;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
//50个消息做一组插入数据库
public void saveToDatabase(){
try {
long start = System.currentTimeMillis();
messAnalysisFeignClient.analysis(messageList);
long end = System.currentTimeMillis();
log.info("处理120条消息所需时间------------"+(end-start));
}catch (Exception e){{
log.info(e.toString());
}
}finally{
messageList.clear();
}
}
}

View File

@@ -0,0 +1,142 @@
package com.njcn.message.consumer;
import com.alibaba.fastjson.JSONObject;
import com.njcn.message.constant.MessageStatus;
import com.njcn.message.messagedto.MessageDataDTO;
import com.njcn.message.websocket.WebSocketServer;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* Description:
* Date: 2024/12/13 10:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@RocketMQMessageListener(
topic = "Real_Time_Data_Topic",
consumerGroup = "real_time_consumer",
selectorExpression = "Test_Tag||Test_Keys",
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class RealTimeDataConsumer extends EnhanceConsumerMessageHandler<MessageDataDTO> implements RocketMQListener<String> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Override
public void onMessage(String message) {
MessageDataDTO messageDataDTO = JSONObject.parseObject(message,MessageDataDTO.class);
super.dispatchMessage(messageDataDTO);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(MessageDataDTO message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 30L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(MessageDataDTO message) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
@Override
protected void handleMessage(MessageDataDTO message) {
String lineId = message.getMonitor();
WebSocketServer.sendInfo(JSONObject.toJSONString(message),lineId);
//删除推送的lineId
redisUtil.delete("AskRealData:".concat(lineId));
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(MessageDataDTO message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
}

View File

@@ -0,0 +1,62 @@
package com.njcn.message.produce;
import com.alibaba.fastjson.JSONObject;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.message.message.DeviceRebootMessage;
import com.njcn.message.message.RecallMessage;
import com.njcn.message.produce.template.DeviceRebootMessageTemplate;
import com.njcn.message.produce.template.RecallMessaggeTemplate;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
/**
* Description:
* Date: 2025/01/15 下午 7:21【需求编号】
*
* @author clam
* @version V1.0.0
*/
@RestController
@RequiredArgsConstructor
@Api(tags = "消息生产者")
@RequestMapping("/produce")
public class ProduceController extends BaseController {
private final RecallMessaggeTemplate recallMessaggeTemplate;
private final DeviceRebootMessageTemplate deviceRebootMessageTemplate;
@PostMapping("/recall")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("数据补招消息推送")
@ApiImplicitParam(name = "param", value = "参数", required = true)
public HttpResult<Boolean> recall(@RequestParam RecallMessage message){
String methodDescribe = getMethodDescribe("recall");
BaseMessage baseMessage = new BaseMessage();
baseMessage.setMessageBoy(JSONObject.toJSONString(message));
recallMessaggeTemplate.sendMember(baseMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@PostMapping("/askRestartDevice")
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("请求前置重启进程消息推送")
@ApiImplicitParam(name = "param", value = "参数", required = true)
public HttpResult<Boolean> askRestartDevice(@RequestBody DeviceRebootMessage message){
String methodDescribe = getMethodDescribe("recall");
BaseMessage baseMessage = new BaseMessage();
baseMessage.setMessageBoy(JSONObject.toJSONString(message));
deviceRebootMessageTemplate.sendMember(baseMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -0,0 +1,30 @@
package com.njcn.message.produce.template;
import com.njcn.message.constant.BusinessResource;
import com.njcn.message.constant.BusinessTopic;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
/**
* Description:
* Date: 2024/12/13 15:15【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
public class AskRealDataMessaggeTemplate extends RocketMQEnhanceTemplate {
public AskRealDataMessaggeTemplate(RocketMQTemplate template) {
super(template);
}
public SendResult sendMember(BaseMessage askRealDataMessage) {
askRealDataMessage.setSource(BusinessResource.WEB_RESOURCE);
askRealDataMessage.setKey("Test_Keys");
return send(BusinessTopic.ASK_REAL_DATA_TOPIC,"Test_Tag" , askRealDataMessage);
}
}

View File

@@ -0,0 +1,30 @@
package com.njcn.message.produce.template;
import com.njcn.message.constant.BusinessResource;
import com.njcn.message.constant.BusinessTopic;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/8/11 15:28
*/
@Component
public class DeviceRebootMessageTemplate extends RocketMQEnhanceTemplate {
public DeviceRebootMessageTemplate(RocketMQTemplate template) {
super(template);
}
public SendResult sendMember(BaseMessage baseMessage) {
baseMessage.setSource(BusinessResource.WEB_RESOURCE);
baseMessage.setKey("Test_Keys");
return send(BusinessTopic.CONTROL_TOPIC,"Test_Tag", baseMessage);
}
}

View File

@@ -0,0 +1,30 @@
package com.njcn.message.produce.template;
import com.njcn.message.constant.BusinessResource;
import com.njcn.message.constant.BusinessTopic;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.middle.rocket.template.RocketMQEnhanceTemplate;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
/**
* Description:
* Date: 2024/12/13 15:15【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
public class RecallMessaggeTemplate extends RocketMQEnhanceTemplate {
public RecallMessaggeTemplate(RocketMQTemplate template) {
super(template);
}
public SendResult sendMember(BaseMessage recallMessage) {
recallMessage.setSource(BusinessResource.WEB_RESOURCE);
recallMessage.setKey("Test_Keys");
return send(BusinessTopic.RECALL_TOPIC,"Test_Tag" , recallMessage);
}
}

View File

@@ -0,0 +1,156 @@
//package com.njcn.message.websocket;
//
//import cn.hutool.core.date.DatePattern;
//import cn.hutool.core.date.LocalDateTimeUtil;
//import com.alibaba.fastjson.JSONObject;
//import com.njcn.common.pojo.annotation.OperateInfo;
//import com.njcn.common.pojo.enums.common.LogEnum;
//import com.njcn.common.pojo.enums.response.CommonResponseEnum;
//import com.njcn.common.pojo.response.HttpResult;
//import com.njcn.common.utils.HttpResultUtil;
//
//import com.njcn.dataProcess.api.DataVFeignClient;
//import com.njcn.message.message.RecallMessage;
//import com.njcn.message.produce.template.RecallMessaggeTemplate;
//import com.njcn.middle.rocket.domain.BaseMessage;
//import com.njcn.web.controller.BaseController;
//import io.swagger.annotations.Api;
//import io.swagger.annotations.ApiImplicitParam;
//import io.swagger.annotations.ApiOperation;
//import lombok.RequiredArgsConstructor;
//import org.springframework.util.CollectionUtils;
//import org.springframework.web.bind.annotation.PostMapping;
//import org.springframework.web.bind.annotation.RequestBody;
//import org.springframework.web.bind.annotation.RequestMapping;
//import org.springframework.web.bind.annotation.RestController;
//
//import java.time.LocalDate;
//import java.time.LocalDateTime;
//import java.time.LocalTime;
//import java.time.format.DateTimeFormatter;
//import java.util.ArrayList;
//import java.util.List;
//import java.util.stream.Collectors;
//import java.util.stream.Stream;
//
//
///**
// * @author hongawen
// */
//@RestController
//@RequiredArgsConstructor
//@Api(tags = "数据补招")
//@RequestMapping("/data")
//public class DataRecallController extends BaseController {
//
// private final DataVFeignClient dataVFeignClient;
// private final CommTerminalGeneralClient commTerminalGeneralClient;
// private final RecallMessaggeTemplate recallMessaggeTemplate;
//
//
//
// /**
// * 算法保存
// */
// @PostMapping("/recall")
// @OperateInfo(info = LogEnum.BUSINESS_COMMON)
// @ApiOperation("数据补招")
// @ApiImplicitParam(name = "param", value = "参数", required = true)
// public HttpResult<Boolean> recall(@RequestBody List<RecallMessage.RecallDTO> param) {
// String methodDescribe = getMethodDescribe("recall");
// RecallMessage message = new RecallMessage();
// BaseMessage baseMessage = new BaseMessage();
//
// if(CollectionUtils.isEmpty(param)){
// LocalDate localDate =LocalDate.now().plusDays(-1);
// List<String> runMonitorIds = commTerminalGeneralClient.getRunMonitorIds().getData();
// List<RecallMessage.RecallDTO> recallDTOList = new ArrayList<>();
// runMonitorIds.forEach(temp->{
// LineDevGetDTO data = commTerminalGeneralClient.getMonitorDetail(temp).getData();
// Integer timeInterval = data.getTimeInterval();
// List<LocalDateTime> localDateTimeList = generateTimeIntervals(localDate, timeInterval);
// List<LocalDateTime> data1 = dataVFeignClient.monitoringTime(temp, LocalDateTimeUtil.format(localDate, DatePattern.NORM_DATE_PATTERN)).getData();
// localDateTimeList.removeAll(data1);
// if(!CollectionUtils.isEmpty(localDateTimeList)){
// List<String> timePeriod = mergeTimeIntervals(localDateTimeList, timeInterval);
// RecallMessage.RecallDTO recallDTO = new RecallMessage.RecallDTO();
// recallDTO.setDataType("0");
// recallDTO.setMonitorId(Stream.of(temp).collect(Collectors.toList()));
// recallDTO.setTimeInterval(timePeriod);
//
// recallDTOList.add(recallDTO);
// }
// });
// if(!CollectionUtils.isEmpty(recallDTOList)){
//
// message.setData(recallDTOList);
// baseMessage.setMessageBoy(JSONObject.toJSONString(message));
//
// recallMessaggeTemplate.sendMember(baseMessage);
// }
// }else {
// message.setData(param);
// baseMessage.setMessageBoy(JSONObject.toJSONString(message));
// recallMessaggeTemplate.sendMember(baseMessage);
// }
//
//
// return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
//
// }
//
// public List<LocalDateTime> generateTimeIntervals(LocalDate date, int intervalMinutes) {
// List<LocalDateTime> dateTimeList = new ArrayList<>();
//
// // Create the starting LocalDateTime
// LocalDateTime startDateTime = LocalDateTime.of(date, LocalTime.MIDNIGHT);
//
// // Create the ending LocalDateTime
// LocalDateTime endDateTime = LocalDateTime.of(date, LocalTime.MAX);
//
// // Generate LocalDateTime list with the given interval
// LocalDateTime currentDateTime = startDateTime;
// while (!currentDateTime.isAfter(endDateTime)) {
// dateTimeList.add(currentDateTime);
// currentDateTime = currentDateTime.plusMinutes(intervalMinutes);
// }
//
// return dateTimeList;
// }
//
// public static List<String> mergeTimeIntervals(List<LocalDateTime> times, int intervalMinutes) {
// List<String> mergedIntervals = new ArrayList<>();
// if (times == null || times.isEmpty()) {
// return mergedIntervals;
// }
//
// // Sort the list to ensure the times are in order
// times.sort(LocalDateTime::compareTo);
//
// LocalDateTime start = times.get(0);
// LocalDateTime end = start;
//
// for (int i = 1; i < times.size(); i++) {
// LocalDateTime current = times.get(i);
// if (current.isAfter(end.plusMinutes(intervalMinutes))) {
// // If the current time is more than interval minutes after the end, close the current interval
// mergedIntervals.add(formatInterval(start, end));
// start = current; // Start a new interval
// }
// end = current; // Update the end of the current interval
// }
//
// // Add the last interval
// mergedIntervals.add(formatInterval(start, end));
//
// return mergedIntervals;
// }
//
// private static String formatInterval(LocalDateTime start, LocalDateTime end) {
// DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// return start.format(formatter) + "~" + end.format(formatter);
// }
//
//
//
//}

View File

@@ -0,0 +1,35 @@
package com.njcn.message.websocket;
import com.njcn.redis.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Description:
* Date: 2025/01/14 上午 10:24【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@EnableScheduling
public class ScheduledTasks {
@Autowired
private RedisUtil redisUtil;
@Scheduled(fixedRate = 15000) // 每15s执行一次
public void reportCurrentTime() {
List<String> likeListAllValues = (List<String>) redisUtil.getLikeListAllValues("AskRealData:");
likeListAllValues.forEach(temp->{
String lineId = temp.replace("AskRealData:","");
WebSocketServer.sendInfo("前置连接存在问题",lineId);
redisUtil.delete(temp);
});
}
}

View File

@@ -0,0 +1,41 @@
package com.njcn.message.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
/**
* Description:
* Date: 2024/12/13 15:09【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* 通信文本消息和二进制缓存区大小
* 避免对接 第三方 报文过大时Websocket 1009 错误
*
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 在此处设置bufferSize
container.setMaxTextMessageBufferSize(10240000);
container.setMaxBinaryMessageBufferSize(10240000);
container.setMaxSessionIdleTimeout(15 * 60000L);
return container;
}
}

View File

@@ -0,0 +1,220 @@
package com.njcn.message.websocket;
import com.alibaba.fastjson.JSONObject;
import com.njcn.device.pq.api.LineFeignClient;
import com.njcn.device.pq.pojo.vo.LineDetailDataVO;
import com.njcn.message.message.AskRealDataMessage;
import com.njcn.message.produce.template.AskRealDataMessaggeTemplate;
import com.njcn.middle.rocket.domain.BaseMessage;
import com.njcn.redis.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Description:
* Date: 2024/12/13 15:11【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@Component
@ServerEndpoint(value ="/api/pushMessage/{userIdAndlineId}")
public class WebSocketServer {
private static AskRealDataMessaggeTemplate askRealDataMessaggeTemplate;
private static RedisUtil redisUtil;
@Autowired
public void setAskRealDataMessaggeTemplate(AskRealDataMessaggeTemplate askRealDataMessaggeTemplate) {
WebSocketServer.askRealDataMessaggeTemplate = askRealDataMessaggeTemplate;
}
@Autowired
public void setRedisUtil( RedisUtil redisUtil) {
WebSocketServer.redisUtil = redisUtil;
}
private static LineFeignClient lineFeignClient;
// 注入的时候,给类的 service 注入
@Autowired
public void setLineFeignClient(LineFeignClient lineFeignClient) {
WebSocketServer.lineFeignClient = lineFeignClient;
}
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set用来存放每个客户端对应的WebSocket对象。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userIdAndlineId = "";
/**
* 连接建立成
* 功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userIdAndlineId") String userIdAndlineId) {
//lineId 是 userid+","+lineId
this.session = session;
this.userIdAndlineId = userIdAndlineId;
if (webSocketMap.containsKey(userIdAndlineId)) {
webSocketMap.remove(userIdAndlineId);
//加入set中
webSocketMap.put(userIdAndlineId, this);
} else {
//加入set中
webSocketMap.put(userIdAndlineId, this);
//在线数加1
addOnlineCount();
}
sendMessage("连接成功");
}
/**
* 连接关闭
* 调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userIdAndlineId)) {
webSocketMap.remove(userIdAndlineId);
//从set中删除
subOnlineCount();
}
log.info("监测点退出:" + userIdAndlineId + ",当前在线监测点数为:" + getOnlineCount());
}
/**
* 收到客户端消
* 息后调用的方法
*
* @param message 客户端发送过来的消息
**/
@OnMessage
public void onMessage(String message, Session session) {
//会每30s发送请求1次
log.info("监测点消息:" + userIdAndlineId + ",报文:" + message);
AskRealDataMessage askRealDataMessage = new AskRealDataMessage();
String[] split = userIdAndlineId.split(",");
askRealDataMessage.setLine(split[1]);
//测试设备187
// askRealDataMessage.setDevSeries("fed656b21f89abb06204e8e4dc6c375a");
LineDetailDataVO data = lineFeignClient.getLineDetailData(split[1]).getData();
askRealDataMessage.setDevSeries(data.getDevId());
BaseMessage baseMessage = new BaseMessage();
baseMessage.setMessageBoy(JSONObject.toJSONString(askRealDataMessage));
// 发送消息到topic1
askRealDataMessaggeTemplate.sendMember(baseMessage);
redisUtil.saveByKey("AskRealData:".concat(split[1]),"AskRealData:".concat(split[1]));
log.info("监测点连接:" + userIdAndlineId + ",当前在线监测点数为:" + getOnlineCount());
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("监测点错误:" + this.userIdAndlineId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务
* 器主动推送
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 发送自定
* 义消息
**/
public static void sendInfo(String message, String lineId) {
log.info("发送消息到:" + lineId + ",报文:" + message);
if (StringUtils.isNotBlank(lineId)) {
Map<String, String> stringStringMap = WebSocketServer.filterMapByKey(webSocketMap, lineId);
stringStringMap.forEach((k,v)->{
webSocketMap.get(k).sendMessage(message);
});
} else {
log.error("监测点" + lineId + ",不在线!");
}
}
/**
* 获得此时的
* 在线监测点
*
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 在线监测点
* 数加1
*/
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
/**
* 在线监测点
* 数减1
*/
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
/**
* 过滤所有键包含指定字符串的条目
* @param map 原始的Map
* @param substring 要检查的子字符串
* @return 过滤的Map
*/
public static Map<String, String> filterMapByKey(ConcurrentHashMap<String, WebSocketServer> map, String substring) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, WebSocketServer> entry : map.entrySet()) {
if (entry.getKey().contains(substring)) {
result.put(entry.getKey(), entry.getValue().toString());
}
}
return result;
}
}