日志记录改造由mqtt异步改造为redis list实现,TPS满足:150-200条/秒
This commit is contained in:
@@ -63,6 +63,13 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.aliyun.oss</groupId>
|
||||
<artifactId>aliyun-sdk-oss</artifactId>
|
||||
<version>3.18.0</version>
|
||||
<optional>false</optional>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -45,7 +45,22 @@ public enum RedisKeyEnum {
|
||||
/**
|
||||
* 云前置心跳
|
||||
*/
|
||||
CLD_HEART_BEAT_KEY("CLD_HEART_BEAT:", 180L);
|
||||
CLD_HEART_BEAT_KEY("CLD_HEART_BEAT:", 180L),
|
||||
|
||||
/**
|
||||
* 用户日志队列
|
||||
*/
|
||||
USER_LOG_QUEUE("USER_LOG_QUEUE", -1L),
|
||||
|
||||
/**
|
||||
* 用户日志邮件推送队列
|
||||
*/
|
||||
USER_LOG_EMAIL_QUEUE("USER_LOG_EMAIL_QUEUE", -1L),
|
||||
|
||||
/**
|
||||
* 终端日志
|
||||
*/
|
||||
DEVICE_LOG_QUEUE("DEVICE_LOG_QUEUE", -1L);
|
||||
|
||||
|
||||
private final String key;
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
package com.njcn.redis.utils;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Redis 消息队列工具类
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RedisMessageQueueUtil {
|
||||
|
||||
private final RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
/**
|
||||
* 推送消息到队列(左推)
|
||||
*
|
||||
* @param queueKey 队列Key
|
||||
* @param message 消息内容(JSON字符串)
|
||||
*/
|
||||
public void pushMessage(String queueKey, String message) {
|
||||
try {
|
||||
redisTemplate.opsForList().leftPush(queueKey, message);
|
||||
} catch (Exception e) {
|
||||
log.error("推送消息到队列失败,queueKey={}", queueKey, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 阻塞式弹出消息(右弹)
|
||||
*
|
||||
* @param queueKey 队列Key
|
||||
* @param timeout 超时时间(秒)
|
||||
* @return 消息内容,超时返回null
|
||||
*/
|
||||
public String popMessage(String queueKey, long timeout) {
|
||||
try {
|
||||
Object result = redisTemplate.opsForList().rightPop(queueKey, timeout, TimeUnit.SECONDS);
|
||||
return result != null ? result.toString() : null;
|
||||
} catch (Exception e) {
|
||||
log.error("弹出消息失败,queueKey={}", queueKey, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取队列长度(用于监控)
|
||||
*
|
||||
* @param queueKey 队列Key
|
||||
* @return 队列长度
|
||||
*/
|
||||
public Long getQueueSize(String queueKey) {
|
||||
try {
|
||||
return redisTemplate.opsForList().size(queueKey);
|
||||
} catch (Exception e) {
|
||||
log.error("获取队列长度失败,queueKey={}", queueKey, e);
|
||||
return 0L;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -47,6 +47,7 @@ public class FeignConfig {
|
||||
@Bean
|
||||
public Decoder feignDecoder() {
|
||||
return (response, type) -> {
|
||||
|
||||
String bodyStr = Util.toString(response.body().asReader(Util.UTF_8));
|
||||
//对结果进行转换
|
||||
HttpResult<Object> result = PubUtils.json2obj(bodyStr, type);
|
||||
@@ -60,6 +61,8 @@ public class FeignConfig {
|
||||
}
|
||||
switch (commonResponseEnum) {
|
||||
case SUCCESS:
|
||||
// case NO_DATA:
|
||||
// case FAIL:
|
||||
return result;
|
||||
default:
|
||||
throw new BusinessException(result);
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package com.njcn.web.service.impl;
|
||||
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.common.config.GeneralInfo;
|
||||
import com.njcn.common.pojo.annotation.OperateInfo;
|
||||
import com.njcn.common.pojo.constant.LogInfo;
|
||||
import com.njcn.common.pojo.dto.DeviceLogDTO;
|
||||
import com.njcn.common.pojo.dto.LogInfoDTO;
|
||||
@@ -10,6 +8,8 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.response.HttpResult;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
import com.njcn.common.utils.ReflectCommonUtil;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisMessageQueueUtil;
|
||||
import com.njcn.web.advice.DeviceLog;
|
||||
import com.njcn.web.service.ILogService;
|
||||
import com.njcn.web.utils.RequestUtil;
|
||||
@@ -46,8 +46,15 @@ public class LogServiceImpl implements ILogService {
|
||||
|
||||
|
||||
private final GeneralInfo generalInfo;
|
||||
/**
|
||||
* mqtt处理日志异步发布
|
||||
*/
|
||||
// private final MqttPublisher publisher;
|
||||
|
||||
private final MqttPublisher publisher;
|
||||
/**
|
||||
* redis队列处理日志异步发布
|
||||
*/
|
||||
private final RedisMessageQueueUtil redisMessageQueueUtil;
|
||||
|
||||
|
||||
/**
|
||||
@@ -82,14 +89,27 @@ public class LogServiceImpl implements ILogService {
|
||||
String operateType = ReflectCommonUtil.getOperateTypeByMethod(returnType.getMethod());
|
||||
Integer severity = levelStringToNumber(level);
|
||||
LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, userName, ip, methodDescribe, operateType, result.equalsIgnoreCase("失败") ? 0 : 1, "", severity, type.equalsIgnoreCase("业务事件") ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now());
|
||||
publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
|
||||
redisMessageQueueUtil.pushMessage(
|
||||
RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
PubUtils.obj2json(logInfoDTO)
|
||||
);
|
||||
|
||||
//推送审计消息功能
|
||||
if (severity != 0) {
|
||||
if (!logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER)) {
|
||||
publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
/**注意,此处是给前端推送的,如果没有了mqtt协议,此处前端就无消息可消费了*/
|
||||
// publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
|
||||
//发送邮箱功能
|
||||
if (severity == 2 && logInfoDTO.getResult() == 0) {
|
||||
//publisher.send("/userLogEmailPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
|
||||
// redisMessageQueueUtil.pushMessage(
|
||||
// RedisKeyEnum.USER_LOG_EMAIL_QUEUE.getKey(),
|
||||
// PubUtils.obj2json(logInfoDTO)
|
||||
// );
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -97,8 +117,11 @@ public class LogServiceImpl implements ILogService {
|
||||
if (Objects.nonNull((returnType.getMethod())) && (returnType.getMethod()).isAnnotationPresent(DeviceLog.class)) {
|
||||
String deviceOperate = returnType.getMethod().getAnnotation(DeviceLog.class).operateType();
|
||||
DeviceLogDTO deviceLogDTO = new DeviceLogDTO(userName, deviceOperate, result.equalsIgnoreCase("失败") ? 0 : 1, "", loginName, userIndex);
|
||||
publisher.send("/deviceLog", PubUtils.obj2json(deviceLogDTO), 2, false);
|
||||
|
||||
// publisher.send("/deviceLog", PubUtils.obj2json(deviceLogDTO), 2, false);
|
||||
redisMessageQueueUtil.pushMessage(
|
||||
RedisKeyEnum.DEVICE_LOG_QUEUE.getKey(),
|
||||
PubUtils.obj2json(deviceLogDTO)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,7 +158,11 @@ public class LogServiceImpl implements ILogService {
|
||||
String operateType = ReflectCommonUtil.getOperateTypeByMethod(method);
|
||||
Integer severity = levelStringToNumber(level);
|
||||
LogInfoDTO logInfoDTO = new LogInfoDTO(tempLogInfo.getLoginName(), tempLogInfo.getUserName(), tempLogInfo.getIp(), ReflectCommonUtil.getMethodDescribeByMethod(method), operateType, result.equalsIgnoreCase("失败") ? 0 : 1, message, severity, type.equalsIgnoreCase("业务事件") ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now());
|
||||
publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false);
|
||||
// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false);
|
||||
redisMessageQueueUtil.pushMessage(
|
||||
RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
PubUtils.obj2json(logInfoDTO)
|
||||
);
|
||||
auditPush(severity, logInfoDTO);
|
||||
}
|
||||
|
||||
@@ -163,7 +190,11 @@ public class LogServiceImpl implements ILogService {
|
||||
Integer severity = levelStringToNumber(level);
|
||||
String operateType = ReflectCommonUtil.getOperateTypeByMethod(method);
|
||||
LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, "", ip, ReflectCommonUtil.getMethodDescribeByMethod(method), operateType, result.equalsIgnoreCase("失败") ? 0 : 1, message, severity, type.equalsIgnoreCase("业务事件") ? 0 : 1, generalInfo.getMicroServiceName(), loginName, LocalDateTime.now());
|
||||
publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false);
|
||||
// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false);
|
||||
redisMessageQueueUtil.pushMessage(
|
||||
RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
PubUtils.obj2json(logInfoDTO)
|
||||
);
|
||||
auditPush(severity, logInfoDTO);
|
||||
}
|
||||
|
||||
@@ -171,7 +202,12 @@ public class LogServiceImpl implements ILogService {
|
||||
//推送审计消息功能
|
||||
if (severity != 0) {
|
||||
if (!logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER)) {
|
||||
publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
/**注意,此处是给前端推送的,如果没有了mqtt协议,此处前端就无消息可消费了*/
|
||||
// publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
// redisMessageQueueUtil.pushMessage(
|
||||
// RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
// PubUtils.obj2json(logInfoDTO)
|
||||
// );
|
||||
//发送邮箱功能
|
||||
if (severity == 2 && logInfoDTO.getResult() == 0) {
|
||||
//publisher.send("/userLogEmailPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.njcn.event.service.majornetwork.Impl;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
@@ -10,14 +9,9 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.advance.api.EventCauseFeignClient;
|
||||
import com.njcn.advance.api.EventWaveAnalysisFeignClient;
|
||||
import com.njcn.advance.pojo.dto.EventAnalysisDTO;
|
||||
import com.njcn.common.pojo.constant.LogInfo;
|
||||
import com.njcn.common.pojo.dto.LogInfoDTO;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
import com.njcn.device.biz.commApi.CommTerminalGeneralClient;
|
||||
import com.njcn.device.biz.pojo.dto.DeptGetBase;
|
||||
import com.njcn.device.biz.pojo.param.DeptGetLineParam;
|
||||
import com.njcn.device.pq.api.DeptLineFeignClient;
|
||||
import com.njcn.device.pq.api.LineFeignClient;
|
||||
import com.njcn.device.pq.pojo.po.DeptLine;
|
||||
@@ -36,7 +30,6 @@ import com.njcn.system.enums.DicDataEnum;
|
||||
import com.njcn.system.pojo.po.DictData;
|
||||
import com.njcn.user.api.DeptFeignClient;
|
||||
import com.njcn.user.pojo.po.Dept;
|
||||
import com.njcn.web.utils.RequestUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.influxdb.dto.QueryResult;
|
||||
|
||||
@@ -4,9 +4,6 @@ import cn.hutool.core.codec.Base64;
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.njcn.common.pojo.constant.LogInfo;
|
||||
import com.njcn.common.pojo.constant.SecurityConstants;
|
||||
import com.njcn.common.pojo.dto.LogInfoDTO;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
@@ -14,6 +11,8 @@ import com.njcn.gateway.enums.GateWayEnum;
|
||||
import com.njcn.gateway.security.AuthorizationManager;
|
||||
import com.njcn.gateway.utils.ResponseUtils;
|
||||
import com.njcn.gateway.utils.WebFluxRequestUtil;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisMessageQueueUtil;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@@ -39,7 +38,6 @@ import java.security.KeyFactory;
|
||||
import java.security.interfaces.RSAPublicKey;
|
||||
import java.security.spec.X509EncodedKeySpec;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @author hongawen
|
||||
@@ -54,7 +52,16 @@ public class ResourceServerConfig {
|
||||
|
||||
private final WhiteListConfig whiteListConfig;
|
||||
|
||||
private final MqttPublisher publisher;
|
||||
/**
|
||||
* mqtt处理日志异步发布
|
||||
*/
|
||||
// private final MqttPublisher publisher;
|
||||
|
||||
/**
|
||||
* redis队列处理日志异步发布
|
||||
*/
|
||||
private final RedisMessageQueueUtil redisMessageQueueUtil;
|
||||
|
||||
|
||||
@Bean
|
||||
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
|
||||
@@ -108,8 +115,12 @@ public class ResourceServerConfig {
|
||||
userIndex,
|
||||
LocalDateTime.now()
|
||||
);
|
||||
publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
redisMessageQueueUtil.pushMessage(
|
||||
RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
PubUtils.obj2json(logInfoDTO)
|
||||
);
|
||||
// publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
return ResponseUtils.writeErrorInfo(response, GateWayEnum.NO_AUTHORIZATION);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -0,0 +1,371 @@
|
||||
package com.njcn.system.consumer;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.njcn.common.pojo.constant.LogInfo;
|
||||
import com.njcn.common.pojo.constant.PatternRegex;
|
||||
import com.njcn.common.pojo.dto.LogInfoDTO;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisMessageQueueUtil;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.service.IUserLogService;
|
||||
import com.njcn.user.api.UserFeignClient;
|
||||
import com.njcn.user.pojo.po.User;
|
||||
import com.njcn.web.utils.EmailUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Redis 日志消费者
|
||||
* 替代原 MQTT 订阅方式,从 Redis List 中消费日志消息
|
||||
*
|
||||
* 核心优化:
|
||||
* 1. 多线程并发消费(3个用户日志线程 + 1个邮件线程)
|
||||
* 2. 智能批量入库(满50条或超时2秒自动刷新)
|
||||
* 3. 实时性保证(BRPOP阻塞式消费,延迟<2秒)
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RedisLogConsumer {
|
||||
|
||||
private final RedisMessageQueueUtil redisMessageQueueUtil;
|
||||
private final IUserLogService userLogService;
|
||||
private final UserFeignClient userFeignClient;
|
||||
private final RedisUtil redisUtil;
|
||||
private final EmailUtil emailUtil;
|
||||
|
||||
private ExecutorService executorService;
|
||||
private volatile boolean running = true;
|
||||
|
||||
/**
|
||||
* 用户角色类型缓存(用于判断日志类型)
|
||||
*/
|
||||
private JSONObject userRoleTypeCache = null;
|
||||
|
||||
/**
|
||||
* 批量阈值:每批最多50条
|
||||
*/
|
||||
private static final int BATCH_SIZE = 50;
|
||||
|
||||
/**
|
||||
* 超时阈值:2秒未满50条也强制刷新
|
||||
*/
|
||||
private static final long FLUSH_TIMEOUT_MS = 2000;
|
||||
|
||||
@PostConstruct
|
||||
public void startConsume() {
|
||||
log.info("启动 Redis 日志消费者...");
|
||||
|
||||
// 创建消费线程(3个用户日志 + 1个邮件推送 + 1个设备日志,根据需要调整)
|
||||
executorService = Executors.newFixedThreadPool(5, r -> {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setDaemon(false); // 非守护线程,确保消息消费完
|
||||
return thread;
|
||||
});
|
||||
|
||||
// 启动3个用户日志消费线程
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final int threadId = i;
|
||||
executorService.submit(() -> consumeUserLogWithBatch(threadId));
|
||||
}
|
||||
|
||||
// 启动1个邮件推送消费线程
|
||||
executorService.submit(this::consumeEmailPush);
|
||||
|
||||
// 启动1个设备日志消费线程(可选)
|
||||
// executorService.submit(this::consumeDeviceLog);
|
||||
|
||||
log.info("Redis 日志消费者启动完成(3个用户日志线程 + 1个邮件线程)");
|
||||
}
|
||||
|
||||
/**
|
||||
* 消费用户日志(带批量优化)
|
||||
*/
|
||||
private void consumeUserLogWithBatch(int threadId) {
|
||||
log.info("用户日志消费线程-{} 启动", threadId);
|
||||
|
||||
List<String> buffer = new ArrayList<>(BATCH_SIZE);
|
||||
long lastFlushTime = System.currentTimeMillis();
|
||||
|
||||
while (running) {
|
||||
try {
|
||||
// BRPOP 阻塞1秒(缩短超时时间,用于定期检查批量刷新条件)
|
||||
String logJson = redisMessageQueueUtil.popMessage(
|
||||
RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
1
|
||||
);
|
||||
|
||||
// 收集到消息,加入缓冲区
|
||||
if (logJson != null) {
|
||||
buffer.add(logJson);
|
||||
}
|
||||
|
||||
// 批量刷新条件:满50条 OR 距离上次刷新超过2秒
|
||||
boolean shouldFlush = buffer.size() >= BATCH_SIZE ||
|
||||
(System.currentTimeMillis() - lastFlushTime > FLUSH_TIMEOUT_MS && !buffer.isEmpty());
|
||||
|
||||
if (shouldFlush) {
|
||||
batchProcessUserLogs(buffer, threadId);
|
||||
buffer.clear();
|
||||
lastFlushTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("线程-{} 消费用户日志失败", threadId, e);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 线程退出前,刷新剩余缓冲区
|
||||
if (!buffer.isEmpty()) {
|
||||
batchProcessUserLogs(buffer, threadId);
|
||||
}
|
||||
|
||||
log.info("用户日志消费线程-{} 退出", threadId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量处理用户日志
|
||||
*/
|
||||
private void batchProcessUserLogs(List<String> logJsonList, int threadId) {
|
||||
if (logJsonList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 确保用户角色类型缓存已加载
|
||||
loadUserRoleTypeCache();
|
||||
|
||||
// 批量解析并处理
|
||||
for (String logJson : logJsonList) {
|
||||
try {
|
||||
LogInfoDTO logInfoDTO = PubUtils.json2obj(logJson, LogInfoDTO.class);
|
||||
// 过滤无效日志
|
||||
if (logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER)
|
||||
|| logInfoDTO.getOperate().equals(LogInfo.UNKNOWN_OPERATE)
|
||||
|| logInfoDTO.getIp().equals(LogInfo.UNKNOWN_IP)) {
|
||||
continue;
|
||||
}
|
||||
// 设置日志类型(业务事件=0,系统事件=1)
|
||||
if ("注销".equals(logInfoDTO.getOperateType()) || "认证".equals(logInfoDTO.getOperateType())) {
|
||||
logInfoDTO.setType(1); // 系统事件
|
||||
} else {
|
||||
String loginName = logInfoDTO.getLoginName();
|
||||
if (StrUtil.isNotBlank(loginName) && userRoleTypeCache.containsKey(loginName)) {
|
||||
if ((Integer) userRoleTypeCache.get(loginName) == 2) {
|
||||
logInfoDTO.setType(0); // 业务事件
|
||||
} else {
|
||||
logInfoDTO.setType(1); // 系统事件
|
||||
}
|
||||
} else {
|
||||
logInfoDTO.setType(1); // 默认系统事件
|
||||
}
|
||||
}
|
||||
// 入库
|
||||
userLogService.addUserLog(logInfoDTO);
|
||||
} catch (Exception e) {
|
||||
log.error("解析日志失败: {}", logJson, e);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("线程-{} 批量入库 {} 条日志", threadId, logJsonList.size());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("线程-{} 批量处理日志失败,涉及 {} 条", threadId, logJsonList.size(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 加载用户角色类型缓存
|
||||
*/
|
||||
private void loadUserRoleTypeCache() {
|
||||
if (Objects.isNull(userRoleTypeCache)) {
|
||||
userRoleTypeCache = (JSONObject) redisUtil.getObjectByKey(RedisKeyEnum.USER_ROLE_TYPE_KEY.getKey());
|
||||
if (Objects.isNull(userRoleTypeCache)) {
|
||||
// 调用 Feign 接口获取用户角色列表(会自动写入 Redis)
|
||||
userFeignClient.userRoleList();
|
||||
userRoleTypeCache = (JSONObject) redisUtil.getObjectByKey(RedisKeyEnum.USER_ROLE_TYPE_KEY.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 降级处理:批量失败时逐条处理(已移除,直接在批量方法中逐条调用)
|
||||
*/
|
||||
private void fallbackProcessOneByOne(List<String> logJsonList, int threadId) {
|
||||
log.warn("线程-{} 批量入库失败,降级为逐条处理", threadId);
|
||||
|
||||
// 确保用户角色类型缓存已加载
|
||||
loadUserRoleTypeCache();
|
||||
|
||||
for (String logJson : logJsonList) {
|
||||
try {
|
||||
LogInfoDTO logInfoDTO = PubUtils.json2obj(logJson, LogInfoDTO.class);
|
||||
|
||||
// 过滤无效日志
|
||||
if (logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER)
|
||||
|| logInfoDTO.getOperate().equals(LogInfo.UNKNOWN_OPERATE)
|
||||
|| logInfoDTO.getIp().equals(LogInfo.UNKNOWN_IP)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 设置日志类型
|
||||
if ("注销".equals(logInfoDTO.getOperateType()) || "认证".equals(logInfoDTO.getOperateType())) {
|
||||
logInfoDTO.setType(1);
|
||||
} else {
|
||||
String loginName = logInfoDTO.getLoginName();
|
||||
if (StrUtil.isNotBlank(loginName) && userRoleTypeCache.containsKey(loginName)) {
|
||||
if ((Integer) userRoleTypeCache.get(loginName) == 2) {
|
||||
logInfoDTO.setType(0);
|
||||
} else {
|
||||
logInfoDTO.setType(1);
|
||||
}
|
||||
} else {
|
||||
logInfoDTO.setType(1);
|
||||
}
|
||||
}
|
||||
|
||||
userLogService.addUserLog(logInfoDTO);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("线程-{} 单条处理也失败: {}", threadId, logJson, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 消费邮件推送(无需批量优化,邮件量少)
|
||||
*/
|
||||
private void consumeEmailPush() {
|
||||
log.info("邮件推送消费线程启动");
|
||||
|
||||
while (running) {
|
||||
try {
|
||||
String pushJson = redisMessageQueueUtil.popMessage(
|
||||
RedisKeyEnum.USER_LOG_EMAIL_QUEUE.getKey(),
|
||||
3 // 邮件推送不频繁,5秒超时即可
|
||||
);
|
||||
if (pushJson != null) {
|
||||
log.debug("消费邮件推送: {}", pushJson);
|
||||
// 解析日志DTO
|
||||
LogInfoDTO auditLogVO = PubUtils.json2obj(pushJson, LogInfoDTO.class);
|
||||
// 获取审计管理员的邮箱列表
|
||||
List<User> data = userFeignClient.getUserListByRoleCode("audit_manager").getData();
|
||||
List<String> emails = data.stream()
|
||||
.filter(x -> Pattern.matches(PatternRegex.EMAIL_REGEX, x.getEmail()))
|
||||
.filter(x -> StrUtil.isNotBlank(x.getEmail()) && x.getEmailNotice() == 1)
|
||||
.map(User::getEmail)
|
||||
.collect(Collectors.toList());
|
||||
// 发送邮件
|
||||
if (CollUtil.isNotEmpty(emails)) {
|
||||
StringBuilder describe = new StringBuilder();
|
||||
describe.append(auditLogVO.getType() == 0 ? "业务事件 -> " : "系统事件 -> ");
|
||||
describe.append(auditLogVO.getUserName()).append("在")
|
||||
.append(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(auditLogVO.getCreateTime()))
|
||||
.append("在")
|
||||
.append(auditLogVO.getIp())
|
||||
.append("执行了")
|
||||
.append(auditLogVO.getOperate())
|
||||
.append(",结果为");
|
||||
if (auditLogVO.getResult() == 1) {
|
||||
describe.append("成功");
|
||||
}
|
||||
if (auditLogVO.getResult() == 0) {
|
||||
describe.append("失败").append(",失败原因为").append(auditLogVO.getFailReason());
|
||||
}
|
||||
emailUtil.sendMultiple(emails, "告警消息", describe.toString(), false);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消费邮件推送失败", e);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("邮件推送消费线程退出");
|
||||
}
|
||||
|
||||
/**
|
||||
* 消费设备日志(可选,如果需要处理设备日志)
|
||||
*/
|
||||
private void consumeDeviceLog() {
|
||||
log.info("设备日志消费线程启动");
|
||||
|
||||
while (running) {
|
||||
try {
|
||||
String deviceLogJson = redisMessageQueueUtil.popMessage(
|
||||
RedisKeyEnum.DEVICE_LOG_QUEUE.getKey(),
|
||||
5 // 设备日志不频繁,5秒超时即可
|
||||
);
|
||||
|
||||
if (deviceLogJson != null) {
|
||||
log.debug("消费设备日志: {}", deviceLogJson);
|
||||
// TODO: 解析并保存设备日志
|
||||
// DeviceLog deviceLog = JSONUtil.toBean(deviceLogJson, DeviceLog.class);
|
||||
// deviceLogService.save(deviceLog);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消费设备日志失败", e);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("设备日志消费线程退出");
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
log.info("开始停止 Redis 日志消费者...");
|
||||
running = false;
|
||||
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
try {
|
||||
// 等待60秒,让队列中的消息处理完
|
||||
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
||||
log.warn("消费线程未在60秒内完成,强制关闭");
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.error("等待消费线程关闭被中断", e);
|
||||
executorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Redis 日志消费者已停止");
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,8 @@
|
||||
package com.njcn.system.handler;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
|
||||
import com.github.tocrhz.mqtt.annotation.Payload;
|
||||
import com.njcn.common.pojo.constant.LogInfo;
|
||||
import com.njcn.common.pojo.constant.PatternRegex;
|
||||
@@ -16,17 +14,14 @@ import com.njcn.system.service.IUserLogService;
|
||||
import com.njcn.user.api.UserFeignClient;
|
||||
import com.njcn.user.pojo.po.User;
|
||||
import com.njcn.web.utils.EmailUtil;
|
||||
import io.swagger.models.auth.In;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.data.redis.core.BoundHashOperations;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -49,11 +44,12 @@ public class MqttMessageHandler {
|
||||
|
||||
|
||||
|
||||
|
||||
private JSONObject jsonObject = null;
|
||||
/**
|
||||
* 订阅审计日志的记录,并进行入库操作
|
||||
*/
|
||||
@MqttSubscribe(value = "/userLog")
|
||||
// @MqttSubscribe(value = "/userLog")
|
||||
public void subUserLog(String topic, MqttMessage message, @Payload String payload) {
|
||||
if(Objects.isNull(jsonObject)){
|
||||
jsonObject = (JSONObject) redisUtil.getObjectByKey(RedisKeyEnum.USER_ROLE_TYPE_KEY.getKey());
|
||||
@@ -86,7 +82,7 @@ public class MqttMessageHandler {
|
||||
/**
|
||||
* 订阅审计日志的记录,并进行入库操作
|
||||
*/
|
||||
@MqttSubscribe(value = "/userLogEmailPush")
|
||||
// @MqttSubscribe(value = "/userLogEmailPush")
|
||||
public void subUserLogEmail(String topic, MqttMessage message, @Payload String payload) {
|
||||
LogInfoDTO auditLogVO = PubUtils.json2obj(new String(message.getPayload(), StandardCharsets.UTF_8),LogInfoDTO.class);
|
||||
List<User> data = userFeignClient.getUserListByRoleCode("audit_manager").getData();
|
||||
|
||||
@@ -21,7 +21,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||
import com.nimbusds.jose.JWSObject;
|
||||
import com.njcn.common.config.GeneralInfo;
|
||||
import com.njcn.common.pojo.constant.OperateType;
|
||||
@@ -37,6 +36,8 @@ import com.njcn.db.constant.DbConstant;
|
||||
import com.njcn.oss.constant.GeneralConstant;
|
||||
import com.njcn.oss.constant.OssPath;
|
||||
import com.njcn.oss.utils.FileStorageUtil;
|
||||
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||
import com.njcn.redis.utils.RedisMessageQueueUtil;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.enums.SystemResponseEnum;
|
||||
import com.njcn.system.excel.UserLogExcel;
|
||||
@@ -93,7 +94,15 @@ public class AuditServiceImpl extends ServiceImpl<UserLogMapper, UserLog> implem
|
||||
|
||||
private final IConfigService iConfigService;
|
||||
|
||||
private final MqttPublisher publisher;
|
||||
/**
|
||||
* mqtt处理日志异步发布
|
||||
*/
|
||||
// private final MqttPublisher publisher;
|
||||
|
||||
/**
|
||||
* redis队列处理日志异步发布
|
||||
*/
|
||||
private final RedisMessageQueueUtil redisMessageQueueUtil;
|
||||
|
||||
private final FileStorageUtil fileStorageUtil;
|
||||
|
||||
@@ -188,7 +197,11 @@ public class AuditServiceImpl extends ServiceImpl<UserLogMapper, UserLog> implem
|
||||
if (auditParam.isExport()) {
|
||||
methodDescribe = "审计日志列表导出";
|
||||
LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, userName, ip, methodDescribe, operateType, "失败".equalsIgnoreCase(result) ? 0 : 1, "", severity, "业务事件".equalsIgnoreCase(type) ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now());
|
||||
publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
redisMessageQueueUtil.pushMessage(
|
||||
RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
PubUtils.obj2json(logInfoDTO)
|
||||
);
|
||||
// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
} else {
|
||||
if (StrUtil.isNotBlank(auditParam.getSortBy()) && StrUtil.isNotBlank(auditParam.getOrderBy())) {
|
||||
methodDescribe = methodDescribe.concat("并以")
|
||||
@@ -197,7 +210,11 @@ public class AuditServiceImpl extends ServiceImpl<UserLogMapper, UserLog> implem
|
||||
.concat(DbConstant.DESC.equalsIgnoreCase(auditParam.getOrderBy()) ? "降序" : "升序")
|
||||
.concat("查询");
|
||||
LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, userName, ip, methodDescribe, operateType, "失败".equalsIgnoreCase(result) ? 0 : 1, "", severity, "业务事件".equalsIgnoreCase(type) ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now());
|
||||
publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
redisMessageQueueUtil.pushMessage(
|
||||
RedisKeyEnum.USER_LOG_QUEUE.getKey(),
|
||||
PubUtils.obj2json(logInfoDTO)
|
||||
);
|
||||
// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user