From 23c4a7838384d643dfa019ae210ef7aa0baef8c1 Mon Sep 17 00:00:00 2001 From: hongawen <83944980@qq.com> Date: Fri, 16 Jan 2026 15:14:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95=E6=94=B9?= =?UTF-8?q?=E9=80=A0=E7=94=B1mqtt=E5=BC=82=E6=AD=A5=E6=94=B9=E9=80=A0?= =?UTF-8?q?=E4=B8=BAredis=20list=E5=AE=9E=E7=8E=B0=EF=BC=8CTPS=E6=BB=A1?= =?UTF-8?q?=E8=B6=B3=EF=BC=9A150-200=E6=9D=A1/=E7=A7=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pqs-common/common-oss/pom.xml | 7 + .../njcn/redis/pojo/enums/RedisKeyEnum.java | 17 +- .../redis/utils/RedisMessageQueueUtil.java | 66 ++++ .../java/com/njcn/web/config/FeignConfig.java | 3 + .../njcn/web/service/impl/LogServiceImpl.java | 56 ++- .../Impl/EventDetailServiceImpl.java | 7 - .../gateway/config/ResourceServerConfig.java | 25 +- .../system/consumer/RedisLogConsumer.java | 371 ++++++++++++++++++ .../system/handler/MqttMessageHandler.java | 10 +- .../system/service/impl/AuditServiceImpl.java | 25 +- 10 files changed, 551 insertions(+), 36 deletions(-) create mode 100644 pqs-common/common-redis/src/main/java/com/njcn/redis/utils/RedisMessageQueueUtil.java create mode 100644 pqs-system/system-boot/src/main/java/com/njcn/system/consumer/RedisLogConsumer.java diff --git a/pqs-common/common-oss/pom.xml b/pqs-common/common-oss/pom.xml index c31b7a36f..86d8a65be 100644 --- a/pqs-common/common-oss/pom.xml +++ b/pqs-common/common-oss/pom.xml @@ -63,6 +63,13 @@ + + + com.aliyun.oss + aliyun-sdk-oss + 3.18.0 + false + \ No newline at end of file diff --git a/pqs-common/common-redis/src/main/java/com/njcn/redis/pojo/enums/RedisKeyEnum.java b/pqs-common/common-redis/src/main/java/com/njcn/redis/pojo/enums/RedisKeyEnum.java index 9b305da0e..0d16c1eec 100644 --- a/pqs-common/common-redis/src/main/java/com/njcn/redis/pojo/enums/RedisKeyEnum.java +++ b/pqs-common/common-redis/src/main/java/com/njcn/redis/pojo/enums/RedisKeyEnum.java @@ -45,7 +45,22 @@ public enum RedisKeyEnum { /** * 云前置心跳 */ - CLD_HEART_BEAT_KEY("CLD_HEART_BEAT:", 180L); + CLD_HEART_BEAT_KEY("CLD_HEART_BEAT:", 180L), + + /** + * 用户日志队列 + */ + USER_LOG_QUEUE("USER_LOG_QUEUE", -1L), + + /** + * 用户日志邮件推送队列 + */ + USER_LOG_EMAIL_QUEUE("USER_LOG_EMAIL_QUEUE", -1L), + + /** + * 终端日志 + */ + DEVICE_LOG_QUEUE("DEVICE_LOG_QUEUE", -1L); private final String key; diff --git a/pqs-common/common-redis/src/main/java/com/njcn/redis/utils/RedisMessageQueueUtil.java b/pqs-common/common-redis/src/main/java/com/njcn/redis/utils/RedisMessageQueueUtil.java new file mode 100644 index 000000000..5ec6f4454 --- /dev/null +++ b/pqs-common/common-redis/src/main/java/com/njcn/redis/utils/RedisMessageQueueUtil.java @@ -0,0 +1,66 @@ +package com.njcn.redis.utils; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * Redis 消息队列工具类 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RedisMessageQueueUtil { + + private final RedisTemplate redisTemplate; + + /** + * 推送消息到队列(左推) + * + * @param queueKey 队列Key + * @param message 消息内容(JSON字符串) + */ + public void pushMessage(String queueKey, String message) { + try { + redisTemplate.opsForList().leftPush(queueKey, message); + } catch (Exception e) { + log.error("推送消息到队列失败,queueKey={}", queueKey, e); + throw e; + } + } + + /** + * 阻塞式弹出消息(右弹) + * + * @param queueKey 队列Key + * @param timeout 超时时间(秒) + * @return 消息内容,超时返回null + */ + public String popMessage(String queueKey, long timeout) { + try { + Object result = redisTemplate.opsForList().rightPop(queueKey, timeout, TimeUnit.SECONDS); + return result != null ? result.toString() : null; + } catch (Exception e) { + log.error("弹出消息失败,queueKey={}", queueKey, e); + return null; + } + } + + /** + * 获取队列长度(用于监控) + * + * @param queueKey 队列Key + * @return 队列长度 + */ + public Long getQueueSize(String queueKey) { + try { + return redisTemplate.opsForList().size(queueKey); + } catch (Exception e) { + log.error("获取队列长度失败,queueKey={}", queueKey, e); + return 0L; + } + } +} \ No newline at end of file diff --git a/pqs-common/common-web/src/main/java/com/njcn/web/config/FeignConfig.java b/pqs-common/common-web/src/main/java/com/njcn/web/config/FeignConfig.java index d8fdfd33e..ec519054b 100644 --- a/pqs-common/common-web/src/main/java/com/njcn/web/config/FeignConfig.java +++ b/pqs-common/common-web/src/main/java/com/njcn/web/config/FeignConfig.java @@ -47,6 +47,7 @@ public class FeignConfig { @Bean public Decoder feignDecoder() { return (response, type) -> { + String bodyStr = Util.toString(response.body().asReader(Util.UTF_8)); //对结果进行转换 HttpResult result = PubUtils.json2obj(bodyStr, type); @@ -60,6 +61,8 @@ public class FeignConfig { } switch (commonResponseEnum) { case SUCCESS: +// case NO_DATA: +// case FAIL: return result; default: throw new BusinessException(result); diff --git a/pqs-common/common-web/src/main/java/com/njcn/web/service/impl/LogServiceImpl.java b/pqs-common/common-web/src/main/java/com/njcn/web/service/impl/LogServiceImpl.java index c1b697e26..f53ce3757 100644 --- a/pqs-common/common-web/src/main/java/com/njcn/web/service/impl/LogServiceImpl.java +++ b/pqs-common/common-web/src/main/java/com/njcn/web/service/impl/LogServiceImpl.java @@ -1,8 +1,6 @@ package com.njcn.web.service.impl; -import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.common.config.GeneralInfo; -import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.constant.LogInfo; import com.njcn.common.pojo.dto.DeviceLogDTO; import com.njcn.common.pojo.dto.LogInfoDTO; @@ -10,6 +8,8 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.PubUtils; import com.njcn.common.utils.ReflectCommonUtil; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisMessageQueueUtil; import com.njcn.web.advice.DeviceLog; import com.njcn.web.service.ILogService; import com.njcn.web.utils.RequestUtil; @@ -46,8 +46,15 @@ public class LogServiceImpl implements ILogService { private final GeneralInfo generalInfo; + /** + * mqtt处理日志异步发布 + */ +// private final MqttPublisher publisher; - private final MqttPublisher publisher; + /** + * redis队列处理日志异步发布 + */ + private final RedisMessageQueueUtil redisMessageQueueUtil; /** @@ -82,14 +89,27 @@ public class LogServiceImpl implements ILogService { String operateType = ReflectCommonUtil.getOperateTypeByMethod(returnType.getMethod()); Integer severity = levelStringToNumber(level); LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, userName, ip, methodDescribe, operateType, result.equalsIgnoreCase("失败") ? 0 : 1, "", severity, type.equalsIgnoreCase("业务事件") ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now()); - publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); +// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); + + redisMessageQueueUtil.pushMessage( + RedisKeyEnum.USER_LOG_QUEUE.getKey(), + PubUtils.obj2json(logInfoDTO) + ); + //推送审计消息功能 if (severity != 0) { if (!logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER)) { - publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false); + /**注意,此处是给前端推送的,如果没有了mqtt协议,此处前端就无消息可消费了*/ +// publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false); + //发送邮箱功能 if (severity == 2 && logInfoDTO.getResult() == 0) { //publisher.send("/userLogEmailPush", PubUtils.obj2json(logInfoDTO), 2, false); + +// redisMessageQueueUtil.pushMessage( +// RedisKeyEnum.USER_LOG_EMAIL_QUEUE.getKey(), +// PubUtils.obj2json(logInfoDTO) +// ); } } } @@ -97,8 +117,11 @@ public class LogServiceImpl implements ILogService { if (Objects.nonNull((returnType.getMethod())) && (returnType.getMethod()).isAnnotationPresent(DeviceLog.class)) { String deviceOperate = returnType.getMethod().getAnnotation(DeviceLog.class).operateType(); DeviceLogDTO deviceLogDTO = new DeviceLogDTO(userName, deviceOperate, result.equalsIgnoreCase("失败") ? 0 : 1, "", loginName, userIndex); - publisher.send("/deviceLog", PubUtils.obj2json(deviceLogDTO), 2, false); - +// publisher.send("/deviceLog", PubUtils.obj2json(deviceLogDTO), 2, false); + redisMessageQueueUtil.pushMessage( + RedisKeyEnum.DEVICE_LOG_QUEUE.getKey(), + PubUtils.obj2json(deviceLogDTO) + ); } } @@ -135,7 +158,11 @@ public class LogServiceImpl implements ILogService { String operateType = ReflectCommonUtil.getOperateTypeByMethod(method); Integer severity = levelStringToNumber(level); LogInfoDTO logInfoDTO = new LogInfoDTO(tempLogInfo.getLoginName(), tempLogInfo.getUserName(), tempLogInfo.getIp(), ReflectCommonUtil.getMethodDescribeByMethod(method), operateType, result.equalsIgnoreCase("失败") ? 0 : 1, message, severity, type.equalsIgnoreCase("业务事件") ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now()); - publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false); +// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false); + redisMessageQueueUtil.pushMessage( + RedisKeyEnum.USER_LOG_QUEUE.getKey(), + PubUtils.obj2json(logInfoDTO) + ); auditPush(severity, logInfoDTO); } @@ -163,7 +190,11 @@ public class LogServiceImpl implements ILogService { Integer severity = levelStringToNumber(level); String operateType = ReflectCommonUtil.getOperateTypeByMethod(method); LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, "", ip, ReflectCommonUtil.getMethodDescribeByMethod(method), operateType, result.equalsIgnoreCase("失败") ? 0 : 1, message, severity, type.equalsIgnoreCase("业务事件") ? 0 : 1, generalInfo.getMicroServiceName(), loginName, LocalDateTime.now()); - publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false); +// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 1, false); + redisMessageQueueUtil.pushMessage( + RedisKeyEnum.USER_LOG_QUEUE.getKey(), + PubUtils.obj2json(logInfoDTO) + ); auditPush(severity, logInfoDTO); } @@ -171,7 +202,12 @@ public class LogServiceImpl implements ILogService { //推送审计消息功能 if (severity != 0) { if (!logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER)) { - publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false); + /**注意,此处是给前端推送的,如果没有了mqtt协议,此处前端就无消息可消费了*/ +// publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false); +// redisMessageQueueUtil.pushMessage( +// RedisKeyEnum.USER_LOG_QUEUE.getKey(), +// PubUtils.obj2json(logInfoDTO) +// ); //发送邮箱功能 if (severity == 2 && logInfoDTO.getResult() == 0) { //publisher.send("/userLogEmailPush", PubUtils.obj2json(logInfoDTO), 2, false); diff --git a/pqs-event/event-boot/src/main/java/com/njcn/event/service/majornetwork/Impl/EventDetailServiceImpl.java b/pqs-event/event-boot/src/main/java/com/njcn/event/service/majornetwork/Impl/EventDetailServiceImpl.java index 3bf50aadf..c8a5599ef 100644 --- a/pqs-event/event-boot/src/main/java/com/njcn/event/service/majornetwork/Impl/EventDetailServiceImpl.java +++ b/pqs-event/event-boot/src/main/java/com/njcn/event/service/majornetwork/Impl/EventDetailServiceImpl.java @@ -2,7 +2,6 @@ package com.njcn.event.service.majornetwork.Impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -10,14 +9,9 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.advance.api.EventCauseFeignClient; -import com.njcn.advance.api.EventWaveAnalysisFeignClient; import com.njcn.advance.pojo.dto.EventAnalysisDTO; -import com.njcn.common.pojo.constant.LogInfo; -import com.njcn.common.pojo.dto.LogInfoDTO; import com.njcn.common.utils.PubUtils; import com.njcn.device.biz.commApi.CommTerminalGeneralClient; -import com.njcn.device.biz.pojo.dto.DeptGetBase; -import com.njcn.device.biz.pojo.param.DeptGetLineParam; import com.njcn.device.pq.api.DeptLineFeignClient; import com.njcn.device.pq.api.LineFeignClient; import com.njcn.device.pq.pojo.po.DeptLine; @@ -36,7 +30,6 @@ import com.njcn.system.enums.DicDataEnum; import com.njcn.system.pojo.po.DictData; import com.njcn.user.api.DeptFeignClient; import com.njcn.user.pojo.po.Dept; -import com.njcn.web.utils.RequestUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.influxdb.dto.QueryResult; diff --git a/pqs-gateway/src/main/java/com/njcn/gateway/config/ResourceServerConfig.java b/pqs-gateway/src/main/java/com/njcn/gateway/config/ResourceServerConfig.java index 028c2a87b..a79fea6cc 100644 --- a/pqs-gateway/src/main/java/com/njcn/gateway/config/ResourceServerConfig.java +++ b/pqs-gateway/src/main/java/com/njcn/gateway/config/ResourceServerConfig.java @@ -4,9 +4,6 @@ import cn.hutool.core.codec.Base64; import cn.hutool.core.convert.Convert; import cn.hutool.core.io.IoUtil; import cn.hutool.json.JSONObject; -import com.alibaba.nacos.shaded.com.google.gson.JsonObject; -import com.github.tocrhz.mqtt.publisher.MqttPublisher; -import com.njcn.common.pojo.constant.LogInfo; import com.njcn.common.pojo.constant.SecurityConstants; import com.njcn.common.pojo.dto.LogInfoDTO; import com.njcn.common.utils.PubUtils; @@ -14,6 +11,8 @@ import com.njcn.gateway.enums.GateWayEnum; import com.njcn.gateway.security.AuthorizationManager; import com.njcn.gateway.utils.ResponseUtils; import com.njcn.gateway.utils.WebFluxRequestUtil; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisMessageQueueUtil; import lombok.AllArgsConstructor; import lombok.SneakyThrows; import org.springframework.context.annotation.Bean; @@ -39,7 +38,6 @@ import java.security.KeyFactory; import java.security.interfaces.RSAPublicKey; import java.security.spec.X509EncodedKeySpec; import java.time.LocalDateTime; -import java.util.Objects; /** * @author hongawen @@ -54,7 +52,16 @@ public class ResourceServerConfig { private final WhiteListConfig whiteListConfig; - private final MqttPublisher publisher; + /** + * mqtt处理日志异步发布 + */ +// private final MqttPublisher publisher; + + /** + * redis队列处理日志异步发布 + */ + private final RedisMessageQueueUtil redisMessageQueueUtil; + @Bean public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) { @@ -108,8 +115,12 @@ public class ResourceServerConfig { userIndex, LocalDateTime.now() ); - publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); - publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false); +// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); + redisMessageQueueUtil.pushMessage( + RedisKeyEnum.USER_LOG_QUEUE.getKey(), + PubUtils.obj2json(logInfoDTO) + ); +// publisher.send("/userLogPush", PubUtils.obj2json(logInfoDTO), 2, false); return ResponseUtils.writeErrorInfo(response, GateWayEnum.NO_AUTHORIZATION); }); } diff --git a/pqs-system/system-boot/src/main/java/com/njcn/system/consumer/RedisLogConsumer.java b/pqs-system/system-boot/src/main/java/com/njcn/system/consumer/RedisLogConsumer.java new file mode 100644 index 000000000..331803512 --- /dev/null +++ b/pqs-system/system-boot/src/main/java/com/njcn/system/consumer/RedisLogConsumer.java @@ -0,0 +1,371 @@ +package com.njcn.system.consumer; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONObject; +import com.njcn.common.pojo.constant.LogInfo; +import com.njcn.common.pojo.constant.PatternRegex; +import com.njcn.common.pojo.dto.LogInfoDTO; +import com.njcn.common.utils.PubUtils; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisMessageQueueUtil; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.system.service.IUserLogService; +import com.njcn.user.api.UserFeignClient; +import com.njcn.user.pojo.po.User; +import com.njcn.web.utils.EmailUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Redis 日志消费者 + * 替代原 MQTT 订阅方式,从 Redis List 中消费日志消息 + * + * 核心优化: + * 1. 多线程并发消费(3个用户日志线程 + 1个邮件线程) + * 2. 智能批量入库(满50条或超时2秒自动刷新) + * 3. 实时性保证(BRPOP阻塞式消费,延迟<2秒) + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RedisLogConsumer { + + private final RedisMessageQueueUtil redisMessageQueueUtil; + private final IUserLogService userLogService; + private final UserFeignClient userFeignClient; + private final RedisUtil redisUtil; + private final EmailUtil emailUtil; + + private ExecutorService executorService; + private volatile boolean running = true; + + /** + * 用户角色类型缓存(用于判断日志类型) + */ + private JSONObject userRoleTypeCache = null; + + /** + * 批量阈值:每批最多50条 + */ + private static final int BATCH_SIZE = 50; + + /** + * 超时阈值:2秒未满50条也强制刷新 + */ + private static final long FLUSH_TIMEOUT_MS = 2000; + + @PostConstruct + public void startConsume() { + log.info("启动 Redis 日志消费者..."); + + // 创建消费线程(3个用户日志 + 1个邮件推送 + 1个设备日志,根据需要调整) + executorService = Executors.newFixedThreadPool(5, r -> { + Thread thread = new Thread(r); + thread.setDaemon(false); // 非守护线程,确保消息消费完 + return thread; + }); + + // 启动3个用户日志消费线程 + for (int i = 0; i < 3; i++) { + final int threadId = i; + executorService.submit(() -> consumeUserLogWithBatch(threadId)); + } + + // 启动1个邮件推送消费线程 + executorService.submit(this::consumeEmailPush); + + // 启动1个设备日志消费线程(可选) + // executorService.submit(this::consumeDeviceLog); + + log.info("Redis 日志消费者启动完成(3个用户日志线程 + 1个邮件线程)"); + } + + /** + * 消费用户日志(带批量优化) + */ + private void consumeUserLogWithBatch(int threadId) { + log.info("用户日志消费线程-{} 启动", threadId); + + List buffer = new ArrayList<>(BATCH_SIZE); + long lastFlushTime = System.currentTimeMillis(); + + while (running) { + try { + // BRPOP 阻塞1秒(缩短超时时间,用于定期检查批量刷新条件) + String logJson = redisMessageQueueUtil.popMessage( + RedisKeyEnum.USER_LOG_QUEUE.getKey(), + 1 + ); + + // 收集到消息,加入缓冲区 + if (logJson != null) { + buffer.add(logJson); + } + + // 批量刷新条件:满50条 OR 距离上次刷新超过2秒 + boolean shouldFlush = buffer.size() >= BATCH_SIZE || + (System.currentTimeMillis() - lastFlushTime > FLUSH_TIMEOUT_MS && !buffer.isEmpty()); + + if (shouldFlush) { + batchProcessUserLogs(buffer, threadId); + buffer.clear(); + lastFlushTime = System.currentTimeMillis(); + } + + } catch (Exception e) { + log.error("线程-{} 消费用户日志失败", threadId, e); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + break; + } + } + } + + // 线程退出前,刷新剩余缓冲区 + if (!buffer.isEmpty()) { + batchProcessUserLogs(buffer, threadId); + } + + log.info("用户日志消费线程-{} 退出", threadId); + } + + /** + * 批量处理用户日志 + */ + private void batchProcessUserLogs(List logJsonList, int threadId) { + if (logJsonList.isEmpty()) { + return; + } + + try { + // 确保用户角色类型缓存已加载 + loadUserRoleTypeCache(); + + // 批量解析并处理 + for (String logJson : logJsonList) { + try { + LogInfoDTO logInfoDTO = PubUtils.json2obj(logJson, LogInfoDTO.class); + // 过滤无效日志 + if (logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER) + || logInfoDTO.getOperate().equals(LogInfo.UNKNOWN_OPERATE) + || logInfoDTO.getIp().equals(LogInfo.UNKNOWN_IP)) { + continue; + } + // 设置日志类型(业务事件=0,系统事件=1) + if ("注销".equals(logInfoDTO.getOperateType()) || "认证".equals(logInfoDTO.getOperateType())) { + logInfoDTO.setType(1); // 系统事件 + } else { + String loginName = logInfoDTO.getLoginName(); + if (StrUtil.isNotBlank(loginName) && userRoleTypeCache.containsKey(loginName)) { + if ((Integer) userRoleTypeCache.get(loginName) == 2) { + logInfoDTO.setType(0); // 业务事件 + } else { + logInfoDTO.setType(1); // 系统事件 + } + } else { + logInfoDTO.setType(1); // 默认系统事件 + } + } + // 入库 + userLogService.addUserLog(logInfoDTO); + } catch (Exception e) { + log.error("解析日志失败: {}", logJson, e); + } + } + + log.info("线程-{} 批量入库 {} 条日志", threadId, logJsonList.size()); + + } catch (Exception e) { + log.error("线程-{} 批量处理日志失败,涉及 {} 条", threadId, logJsonList.size(), e); + } + } + + /** + * 加载用户角色类型缓存 + */ + private void loadUserRoleTypeCache() { + if (Objects.isNull(userRoleTypeCache)) { + userRoleTypeCache = (JSONObject) redisUtil.getObjectByKey(RedisKeyEnum.USER_ROLE_TYPE_KEY.getKey()); + if (Objects.isNull(userRoleTypeCache)) { + // 调用 Feign 接口获取用户角色列表(会自动写入 Redis) + userFeignClient.userRoleList(); + userRoleTypeCache = (JSONObject) redisUtil.getObjectByKey(RedisKeyEnum.USER_ROLE_TYPE_KEY.getKey()); + } + } + } + + /** + * 降级处理:批量失败时逐条处理(已移除,直接在批量方法中逐条调用) + */ + private void fallbackProcessOneByOne(List logJsonList, int threadId) { + log.warn("线程-{} 批量入库失败,降级为逐条处理", threadId); + + // 确保用户角色类型缓存已加载 + loadUserRoleTypeCache(); + + for (String logJson : logJsonList) { + try { + LogInfoDTO logInfoDTO = PubUtils.json2obj(logJson, LogInfoDTO.class); + + // 过滤无效日志 + if (logInfoDTO.getLoginName().equals(LogInfo.UNKNOWN_USER) + || logInfoDTO.getOperate().equals(LogInfo.UNKNOWN_OPERATE) + || logInfoDTO.getIp().equals(LogInfo.UNKNOWN_IP)) { + continue; + } + + // 设置日志类型 + if ("注销".equals(logInfoDTO.getOperateType()) || "认证".equals(logInfoDTO.getOperateType())) { + logInfoDTO.setType(1); + } else { + String loginName = logInfoDTO.getLoginName(); + if (StrUtil.isNotBlank(loginName) && userRoleTypeCache.containsKey(loginName)) { + if ((Integer) userRoleTypeCache.get(loginName) == 2) { + logInfoDTO.setType(0); + } else { + logInfoDTO.setType(1); + } + } else { + logInfoDTO.setType(1); + } + } + + userLogService.addUserLog(logInfoDTO); + + } catch (Exception e) { + log.error("线程-{} 单条处理也失败: {}", threadId, logJson, e); + } + } + } + + /** + * 消费邮件推送(无需批量优化,邮件量少) + */ + private void consumeEmailPush() { + log.info("邮件推送消费线程启动"); + + while (running) { + try { + String pushJson = redisMessageQueueUtil.popMessage( + RedisKeyEnum.USER_LOG_EMAIL_QUEUE.getKey(), + 3 // 邮件推送不频繁,5秒超时即可 + ); + if (pushJson != null) { + log.debug("消费邮件推送: {}", pushJson); + // 解析日志DTO + LogInfoDTO auditLogVO = PubUtils.json2obj(pushJson, LogInfoDTO.class); + // 获取审计管理员的邮箱列表 + List data = userFeignClient.getUserListByRoleCode("audit_manager").getData(); + List emails = data.stream() + .filter(x -> Pattern.matches(PatternRegex.EMAIL_REGEX, x.getEmail())) + .filter(x -> StrUtil.isNotBlank(x.getEmail()) && x.getEmailNotice() == 1) + .map(User::getEmail) + .collect(Collectors.toList()); + // 发送邮件 + if (CollUtil.isNotEmpty(emails)) { + StringBuilder describe = new StringBuilder(); + describe.append(auditLogVO.getType() == 0 ? "业务事件 -> " : "系统事件 -> "); + describe.append(auditLogVO.getUserName()).append("在") + .append(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(auditLogVO.getCreateTime())) + .append("在") + .append(auditLogVO.getIp()) + .append("执行了") + .append(auditLogVO.getOperate()) + .append(",结果为"); + if (auditLogVO.getResult() == 1) { + describe.append("成功"); + } + if (auditLogVO.getResult() == 0) { + describe.append("失败").append(",失败原因为").append(auditLogVO.getFailReason()); + } + emailUtil.sendMultiple(emails, "告警消息", describe.toString(), false); + } + } + + } catch (Exception e) { + log.error("消费邮件推送失败", e); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + break; + } + } + } + + log.info("邮件推送消费线程退出"); + } + + /** + * 消费设备日志(可选,如果需要处理设备日志) + */ + private void consumeDeviceLog() { + log.info("设备日志消费线程启动"); + + while (running) { + try { + String deviceLogJson = redisMessageQueueUtil.popMessage( + RedisKeyEnum.DEVICE_LOG_QUEUE.getKey(), + 5 // 设备日志不频繁,5秒超时即可 + ); + + if (deviceLogJson != null) { + log.debug("消费设备日志: {}", deviceLogJson); + // TODO: 解析并保存设备日志 + // DeviceLog deviceLog = JSONUtil.toBean(deviceLogJson, DeviceLog.class); + // deviceLogService.save(deviceLog); + } + + } catch (Exception e) { + log.error("消费设备日志失败", e); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + break; + } + } + } + + log.info("设备日志消费线程退出"); + } + + @PreDestroy + public void shutdown() { + log.info("开始停止 Redis 日志消费者..."); + running = false; + + if (executorService != null) { + executorService.shutdown(); + try { + // 等待60秒,让队列中的消息处理完 + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + log.warn("消费线程未在60秒内完成,强制关闭"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + log.error("等待消费线程关闭被中断", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + log.info("Redis 日志消费者已停止"); + } +} \ No newline at end of file diff --git a/pqs-system/system-boot/src/main/java/com/njcn/system/handler/MqttMessageHandler.java b/pqs-system/system-boot/src/main/java/com/njcn/system/handler/MqttMessageHandler.java index f59663ea2..9c28ef7a5 100644 --- a/pqs-system/system-boot/src/main/java/com/njcn/system/handler/MqttMessageHandler.java +++ b/pqs-system/system-boot/src/main/java/com/njcn/system/handler/MqttMessageHandler.java @@ -1,10 +1,8 @@ package com.njcn.system.handler; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; -import com.github.tocrhz.mqtt.annotation.MqttSubscribe; import com.github.tocrhz.mqtt.annotation.Payload; import com.njcn.common.pojo.constant.LogInfo; import com.njcn.common.pojo.constant.PatternRegex; @@ -16,17 +14,14 @@ import com.njcn.system.service.IUserLogService; import com.njcn.user.api.UserFeignClient; import com.njcn.user.pojo.po.User; import com.njcn.web.utils.EmailUtil; -import io.swagger.models.auth.In; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.data.redis.core.BoundHashOperations; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.time.format.DateTimeFormatter; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -49,11 +44,12 @@ public class MqttMessageHandler { + private JSONObject jsonObject = null; /** * 订阅审计日志的记录,并进行入库操作 */ - @MqttSubscribe(value = "/userLog") +// @MqttSubscribe(value = "/userLog") public void subUserLog(String topic, MqttMessage message, @Payload String payload) { if(Objects.isNull(jsonObject)){ jsonObject = (JSONObject) redisUtil.getObjectByKey(RedisKeyEnum.USER_ROLE_TYPE_KEY.getKey()); @@ -86,7 +82,7 @@ public class MqttMessageHandler { /** * 订阅审计日志的记录,并进行入库操作 */ - @MqttSubscribe(value = "/userLogEmailPush") +// @MqttSubscribe(value = "/userLogEmailPush") public void subUserLogEmail(String topic, MqttMessage message, @Payload String payload) { LogInfoDTO auditLogVO = PubUtils.json2obj(new String(message.getPayload(), StandardCharsets.UTF_8),LogInfoDTO.class); List data = userFeignClient.getUserListByRoleCode("audit_manager").getData(); diff --git a/pqs-system/system-boot/src/main/java/com/njcn/system/service/impl/AuditServiceImpl.java b/pqs-system/system-boot/src/main/java/com/njcn/system/service/impl/AuditServiceImpl.java index 2388ddf73..471affbc1 100644 --- a/pqs-system/system-boot/src/main/java/com/njcn/system/service/impl/AuditServiceImpl.java +++ b/pqs-system/system-boot/src/main/java/com/njcn/system/service/impl/AuditServiceImpl.java @@ -21,7 +21,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.nimbusds.jose.JWSObject; import com.njcn.common.config.GeneralInfo; import com.njcn.common.pojo.constant.OperateType; @@ -37,6 +36,8 @@ import com.njcn.db.constant.DbConstant; import com.njcn.oss.constant.GeneralConstant; import com.njcn.oss.constant.OssPath; import com.njcn.oss.utils.FileStorageUtil; +import com.njcn.redis.pojo.enums.RedisKeyEnum; +import com.njcn.redis.utils.RedisMessageQueueUtil; import com.njcn.redis.utils.RedisUtil; import com.njcn.system.enums.SystemResponseEnum; import com.njcn.system.excel.UserLogExcel; @@ -93,7 +94,15 @@ public class AuditServiceImpl extends ServiceImpl implem private final IConfigService iConfigService; - private final MqttPublisher publisher; + /** + * mqtt处理日志异步发布 + */ +// private final MqttPublisher publisher; + + /** + * redis队列处理日志异步发布 + */ + private final RedisMessageQueueUtil redisMessageQueueUtil; private final FileStorageUtil fileStorageUtil; @@ -188,7 +197,11 @@ public class AuditServiceImpl extends ServiceImpl implem if (auditParam.isExport()) { methodDescribe = "审计日志列表导出"; LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, userName, ip, methodDescribe, operateType, "失败".equalsIgnoreCase(result) ? 0 : 1, "", severity, "业务事件".equalsIgnoreCase(type) ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now()); - publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); + redisMessageQueueUtil.pushMessage( + RedisKeyEnum.USER_LOG_QUEUE.getKey(), + PubUtils.obj2json(logInfoDTO) + ); +// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); } else { if (StrUtil.isNotBlank(auditParam.getSortBy()) && StrUtil.isNotBlank(auditParam.getOrderBy())) { methodDescribe = methodDescribe.concat("并以") @@ -197,7 +210,11 @@ public class AuditServiceImpl extends ServiceImpl implem .concat(DbConstant.DESC.equalsIgnoreCase(auditParam.getOrderBy()) ? "降序" : "升序") .concat("查询"); LogInfoDTO logInfoDTO = new LogInfoDTO(loginName, userName, ip, methodDescribe, operateType, "失败".equalsIgnoreCase(result) ? 0 : 1, "", severity, "业务事件".equalsIgnoreCase(type) ? 0 : 1, generalInfo.getMicroServiceName(), userIndex, LocalDateTime.now()); - publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); + redisMessageQueueUtil.pushMessage( + RedisKeyEnum.USER_LOG_QUEUE.getKey(), + PubUtils.obj2json(logInfoDTO) + ); +// publisher.send("/userLog", PubUtils.obj2json(logInfoDTO), 2, false); } }