diff --git a/detection/src/main/java/com/njcn/gather/plan/controller/AdPlanController.java b/detection/src/main/java/com/njcn/gather/plan/controller/AdPlanController.java index 6cb2bf9d..e30cd00b 100644 --- a/detection/src/main/java/com/njcn/gather/plan/controller/AdPlanController.java +++ b/detection/src/main/java/com/njcn/gather/plan/controller/AdPlanController.java @@ -19,6 +19,7 @@ import com.njcn.gather.device.service.IPqDevService; import com.njcn.gather.plan.pojo.param.AdPlanParam; import com.njcn.gather.plan.pojo.po.AdPlan; import com.njcn.gather.plan.pojo.vo.AdPlanVO; +import com.njcn.gather.plan.service.AsyncPlanHandler; import com.njcn.gather.plan.service.IAdPlanService; import com.njcn.gather.type.pojo.po.DevType; import com.njcn.gather.type.service.IDevTypeService; @@ -43,6 +44,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static com.njcn.web.utils.RequestUtil.getUserId; + /** * @author caozehui * @date 2024-12-09 @@ -58,6 +61,7 @@ public class AdPlanController extends BaseController { private final IPqDevService pqDevService; private final IDevTypeService devTypeService; private final ISysUserService sysUserService; + private final AsyncPlanHandler asyncPlanHandler; @OperateInfo(info = LogEnum.BUSINESS_COMMON) @PostMapping("/list") @@ -175,7 +179,7 @@ public class AdPlanController extends BaseController { public HttpResult>> getBigTestItem(@RequestBody AdPlanParam.CheckParam checkParam) { String methodDescribe = getMethodDescribe("getBigTestItem"); LogUtil.njcnDebug(log, "{},查询数据为:{}", methodDescribe, checkParam); - List> result = adPlanService.getBigTestItem(checkParam.getReCheckType(), checkParam.getPlanId(), checkParam.getDevIds(), checkParam.getPatternId(),checkParam.getScriptType()); + List> result = adPlanService.getBigTestItem(checkParam.getReCheckType(), checkParam.getPlanId(), checkParam.getDevIds(), checkParam.getPatternId(), checkParam.getScriptType()); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); } @@ -400,39 +404,6 @@ public class AdPlanController extends BaseController { } - @OperateInfo(info = LogEnum.BUSINESS_COMMON, operateType = OperateType.UPLOAD) - @PostMapping(value = "/importSubPlanCheckData") - @ApiOperation("导入子计划检测结果") - @ApiImplicitParams({ - @ApiImplicitParam(name = "file", value = "检测计划检测结果数据文件", required = true), - @ApiImplicitParam(name = "patternId", value = "模式Id", required = true) - }) - public HttpResult importSubPlanCheckData(@RequestParam("file") MultipartFile file, @RequestParam("patternId") String patternId, HttpServletResponse response) { - String methodDescribe = getMethodDescribe("importSubPlanCheckData"); - LogUtil.njcnDebug(log, "{},上传文件为:{}", methodDescribe, file.getOriginalFilename()); - boolean fileType = FileUtil.judgeFileIsZip(file.getOriginalFilename()); - if (!fileType) { - CommonResponseEnum fileTypeError = CommonResponseEnum.FILE_XLSX_ERROR; - fileTypeError.setMessage("请上传zip文件"); - throw new BusinessException(fileTypeError); - } - adPlanService.importSubPlanCheckDataZip(file, patternId, response); - return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe); - - } - - @OperateInfo(info = LogEnum.BUSINESS_COMMON, operateType = OperateType.UPLOAD) - @PostMapping(value = "/mergePlanCheckData") - @ApiOperation("合并计划检测结果数据") - @ApiImplicitParam(name = "planId", value = "计划id", required = true) - public HttpResult mergePlanCheckData(@RequestParam("planId") String planId) { - String methodDescribe = getMethodDescribe("mergePlanCheckData"); - LogUtil.njcnDebug(log, "{},合并计划ID数据为:{}", methodDescribe, planId); - adPlanService.mergePlanCheckData(planId); - return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe); - - } - @OperateInfo(info = LogEnum.BUSINESS_COMMON) @GetMapping("/getMemberList") @ApiOperation("根据计划ID获取项目成员") @@ -452,5 +423,27 @@ public class AdPlanController extends BaseController { } return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); } + + + @OperateInfo(info = LogEnum.BUSINESS_COMMON, operateType = OperateType.UPLOAD) + @PostMapping(value = "/importAndMergePlanCheckData") + @ApiOperation("合并计划检测结果数据") + @ApiImplicitParams({ + @ApiImplicitParam(name = "file", value = "检测计划检测结果数据文件", required = true), + @ApiImplicitParam(name = "planId", value = "计划Id", required = true) + }) + public HttpResult importAndMergePlanCheckData(@RequestParam("file") MultipartFile file, @RequestParam("planId") String planId) { + String methodDescribe = getMethodDescribe("importAndMergePlanCheckData"); + LogUtil.njcnDebug(log, "{},合并计划ID检测数据为:{}", methodDescribe, planId); + boolean fileType = FileUtil.judgeFileIsZip(file.getOriginalFilename()); + if (!fileType) { + CommonResponseEnum fileTypeError = CommonResponseEnum.FILE_XLSX_ERROR; + fileTypeError.setMessage("请上传zip文件"); + throw new BusinessException(fileTypeError); + } + asyncPlanHandler.importAndMergePlanCheckData(file, getUserId(), planId); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe); + + } } diff --git a/detection/src/main/java/com/njcn/gather/plan/controller/SseController.java b/detection/src/main/java/com/njcn/gather/plan/controller/SseController.java new file mode 100644 index 00000000..cfd55c60 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/plan/controller/SseController.java @@ -0,0 +1,38 @@ +package com.njcn.gather.plan.controller; + +import com.njcn.gather.plan.service.SseClient; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import static com.njcn.web.utils.RequestUtil.getUserId; + +/** + * SSE控制器,用于处理SSE相关的请求。 + */ +@Slf4j +@RequiredArgsConstructor +@RequestMapping("/sse") +@RestController +public class SseController { + + private final SseClient sseClient; + + + /** + * 创建SSE连接。 + * + * @return SseEmitter对象,用于与客户端建立SSE连接。 + */ + @GetMapping("/createSse") + public SseEmitter createConnect() { + String uid = getUserId(); + SseEmitter sse = sseClient.createSse(uid); + log.info("为用户UID: {} 创建SSE连接", uid); + return sse; + } + +} \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/plan/pojo/vo/AdPlanCheckDataVO.java b/detection/src/main/java/com/njcn/gather/plan/pojo/vo/AdPlanCheckDataVO.java index e77718ac..53c23b5a 100644 --- a/detection/src/main/java/com/njcn/gather/plan/pojo/vo/AdPlanCheckDataVO.java +++ b/detection/src/main/java/com/njcn/gather/plan/pojo/vo/AdPlanCheckDataVO.java @@ -3,12 +3,12 @@ package com.njcn.gather.plan.pojo.vo; import com.njcn.gather.detection.pojo.po.AdPair; import com.njcn.gather.device.pojo.po.PqDev; import com.njcn.gather.device.pojo.po.PqDevSub; +import com.njcn.gather.monitor.pojo.po.PqMonitor; import com.njcn.gather.plan.pojo.po.AdPlan; import lombok.Data; import lombok.EqualsAndHashCode; import java.util.List; -import java.util.Map; @Data @EqualsAndHashCode(callSuper = false) @@ -18,5 +18,7 @@ public class AdPlanCheckDataVO { private List devList; private List devSubList; private List pairList; - private Map>> checkData; + private List monitorList; + private List devMonitorIds; + private int dataBatch; } diff --git a/detection/src/main/java/com/njcn/gather/plan/service/AsyncPlanHandler.java b/detection/src/main/java/com/njcn/gather/plan/service/AsyncPlanHandler.java new file mode 100644 index 00000000..e129c81c --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/plan/service/AsyncPlanHandler.java @@ -0,0 +1,297 @@ +package com.njcn.gather.plan.service; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.ZipUtil; +import cn.hutool.json.JSONUtil; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.gather.detection.pojo.po.AdPair; +import com.njcn.gather.detection.service.IAdPariService; +import com.njcn.gather.device.pojo.po.PqDev; +import com.njcn.gather.device.pojo.po.PqDevSub; +import com.njcn.gather.device.service.IPqDevService; +import com.njcn.gather.device.service.IPqDevSubService; +import com.njcn.gather.plan.pojo.po.AdPlan; +import com.njcn.gather.plan.pojo.vo.AdPlanCheckDataVO; +import com.njcn.gather.plan.service.util.BatchFileReader; +import com.njcn.gather.system.config.handler.NonWebAutoFillValueHandler; +import com.njcn.gather.tools.report.model.constant.ReportConstant; +import com.njcn.gather.type.pojo.po.DevType; +import com.njcn.gather.type.service.IDevTypeService; +import com.njcn.web.utils.HttpResultUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.multipart.MultipartFile; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@EnableAsync +@RequiredArgsConstructor +@Component +public class AsyncPlanHandler { + + private final SseClient sseClient; + private final IAdPlanService adPlanService; + private final IPqDevService pqDevService; + private final IDevTypeService devTypeService; + private final IPqDevSubService pqDevSubService; + + private final IAdPariService adPairService; + + private final JdbcTemplate jdbcTemplate; + @Value("${report.reportDir}") + private String reportPath; + + @Transactional + @Async + public void importAndMergePlanCheckData(MultipartFile file, String uid, String planId) { + NonWebAutoFillValueHandler.setCurrentUserId(uid); + AtomicInteger progress = new AtomicInteger(); + try { + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始保存文件")); + // 创建临时目录用于解压文件 + File tempDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "import_plan_check_data_" + System.currentTimeMillis() + "/"); + + // 将上传的zip文件保存到临时目录 + File zipFile = FileUtil.file(tempDir, file.getOriginalFilename()); + file.transferTo(zipFile); + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始解压文件")); + + // 解压zip文件 + File unzipDir = FileUtil.mkdir(FileUtil.file(tempDir, "unzip")); + ZipUtil.unzip(zipFile.getAbsolutePath(), unzipDir.getAbsolutePath()); + + // 查找解压目录中的json文件 + File[] files = unzipDir.listFiles(); + AdPlanCheckDataVO planCheckDataVO = null; + List dataFiles = new ArrayList<>(); + List docxFiles = new ArrayList<>(); + if (files != null) { + for (File f : files) { + if (f.isFile()) { + // 读取json文件内容 + if (f.getName().endsWith(".json")) { + String jsonStr = FileUtil.readUtf8String(f); + planCheckDataVO = JSONUtil.toBean(jsonStr, AdPlanCheckDataVO.class); + } else if (f.getName().endsWith(".docx")) { + docxFiles.add(f); + } else if (f.getName().endsWith(".txt")) { + dataFiles.add(f); + } + } + } + } + + if (planCheckDataVO == null) { + FileUtil.del(tempDir); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "ZIP文件中未找到JSON数据文件")); + return; + } + AdPlan checkPlan = planCheckDataVO.getPlan(); + if (checkPlan == null) { + FileUtil.del(tempDir); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "ZIP文件中未找到检测计划信息")); + return; + } + + List devList = planCheckDataVO.getDevList(); + if (CollUtil.isEmpty(devList)) { + FileUtil.del(tempDir); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "ZIP文件中未找到被检设备信息")); + return; + } + + AdPlan subPlan = adPlanService.getById(checkPlan.getId()); + if (subPlan == null) { + FileUtil.del(tempDir); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "该子计划不存在")); + return; + } + if (!StrUtil.equals(planId, subPlan.getFatherPlanId())) { + FileUtil.del(tempDir); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "该当前检修计划的子计划")); + return; + } + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步计划基本信息")); + // 更新检测计划信息 + checkPlan.setFatherPlanId(subPlan.getFatherPlanId()); + checkPlan.setImportFlag(0); + adPlanService.updateById(checkPlan); + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步计划设备信息")); + + // 批量更新被检设备信息 + // 不更新导入标志 + devList.forEach(dev -> dev.setImportFlag(null)); + pqDevService.updateBatchById(devList); + + List devSubList = planCheckDataVO.getDevSubList(); + for (PqDevSub devSub : devSubList) { + pqDevSubService.update(devSub, new LambdaUpdateWrapper().eq(PqDevSub::getDevId, devSub.getDevId())); + } + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步通道配对信息")); + + // 同步检测数据 + List pairList = planCheckDataVO.getPairList(); + adPairService.updateBatchById(pairList); + + + if (CollUtil.isNotEmpty(docxFiles)) { + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步检测报告文件")); + for (File docx : docxFiles) { + for (PqDev dev : devList) { + DevType devType = devTypeService.getById(dev.getDevType()); + String dirPath = reportPath.concat(File.separator).concat(devType.getName()); + File reportFile = new File(dirPath.concat(File.separator).concat(dev.getCreateId()).concat(ReportConstant.DOCX)); + // 文件名匹配,复制到对应目录下 + if (docx.getName().equals(reportFile.getName())) { + File parentDir = FileUtil.mkParentDirs(reportFile); + FileUtil.copy(docx, parentDir, true); + } + + } + } + } + if (CollUtil.isNotEmpty(dataFiles)) { + AdPlan plan = adPlanService.getById(planId); + Integer planCode = plan.getCode(); + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步检测数据信息")); + // 合并前清除相关表数据 + String mainHarmonicTableName = "ad_harmonic_" + planCode; + String mainNonHarmonicTableName = "ad_non_harmonic_" + planCode; + String mainHarmonicResultTableName = "ad_harmonic_result_" + planCode; + String mainNonHarmonicResultTableName = "ad_non_harmonic_result_" + planCode; + List devMonitorIds = planCheckDataVO.getDevMonitorIds(); + if (CollUtil.isNotEmpty(devMonitorIds)) { + // 使用 StringBuilder 构建带引号的ID列表,防止SQL注入 + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < devMonitorIds.size(); i++) { + if (i > 0) sb.append(","); + sb.append("'").append(devMonitorIds.get(i)).append("'"); + } + String devMonitorIdsStr = sb.toString(); + + jdbcTemplate.update("DELETE FROM " + mainHarmonicTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")"); + jdbcTemplate.update("DELETE FROM " + mainNonHarmonicTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")"); + jdbcTemplate.update("DELETE FROM " + mainHarmonicResultTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")"); + jdbcTemplate.update("DELETE FROM " + mainNonHarmonicResultTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")"); + } + int dataBatch = planCheckDataVO.getDataBatch(); + int step = 80 / (dataBatch + 1); + for (File dataFile : dataFiles) { + // 直接插入主计划表中 + String fileName = FileUtil.mainName(dataFile); + + String tableName = fileName + StrUtil.UNDERLINE + planCode; + + // 使用BatchFileReader分批处理文件 + final boolean[] isFirstBatch = {true}; + final String[] headers = {null}; + + BatchFileReader.readLinesInBatches(dataFile, 10000, lines -> { + progress.addAndGet(step); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "同步检测数据信息中")); + + if (CollUtil.isNotEmpty(lines)) { + if (isFirstBatch[0]) { + // 第一批次的第一行为字段名 + headers[0] = lines.get(0); + // 处理剩余行作为数据 + if (lines.size() > 1) { + List dataLines = lines.subList(1, lines.size()); + processBatchData(tableName, headers[0], dataLines); + } + isFirstBatch[0] = false; + } else { + // 后续批次全部为数据行 + processBatchData(tableName, headers[0], lines); + } + } + }); + + + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "表 " + tableName + " 数据同步完成")); + } + } + + progress.addAndGet(1); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始合并检测数据信息")); + + // 删除临时目录 + FileUtil.del(tempDir); + progress.set(100); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "数据合并完成")); + + } catch (Exception e) { + log.error("导入数据失败", e); + sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "导入失败")); + } finally { + NonWebAutoFillValueHandler.clearCurrentUserId(); + } + + + sseClient.closeSse(uid); + + } + + + /** + * 处理数据行 + * + * @param tableName 表名 + * @param headers 表头 + * @param lines 数据行 + */ + private void processBatchData(String tableName, String headers, List lines) { + if (lines == null || lines.isEmpty()) { + return; + } + // 构建INSERT语句 + String[] headerArray = headers.split("\t"); + String columnNames = String.join(",", headerArray); + String placeholders = String.join(",", java.util.Collections.nCopies(headerArray.length, "?")); + String sql = "INSERT INTO " + tableName + " (" + columnNames + ") VALUES (" + placeholders + ")"; + + // 批量插入数据 + jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { + @Override + public void setValues(java.sql.PreparedStatement ps, int i) throws java.sql.SQLException { + String line = lines.get(i); + String[] fields = line.split("\t", -1); // 使用-1限制以保留空值 + + for (int j = 0; j < fields.length && j < headerArray.length; j++) { + String value = fields[j]; + if (StrUtil.isEmpty(value) || StrUtil.equals(value, StrUtil.NULL)) { + ps.setNull(j + 1, java.sql.Types.VARCHAR); + } else { + ps.setString(j + 1, value); + } + } + } + + @Override + public int getBatchSize() { + return lines.size(); + } + }); + } +} diff --git a/detection/src/main/java/com/njcn/gather/plan/service/IAdPlanService.java b/detection/src/main/java/com/njcn/gather/plan/service/IAdPlanService.java index cc3f7732..72e4cf2a 100644 --- a/detection/src/main/java/com/njcn/gather/plan/service/IAdPlanService.java +++ b/detection/src/main/java/com/njcn/gather/plan/service/IAdPlanService.java @@ -199,20 +199,4 @@ public interface IAdPlanService extends IService { */ void exportPlanCheckDataZip(String planId, List devIds, Integer report, HttpServletResponse response); - /** - * 导入计划检测结果数据 - * - * @param file - * @param patternId - * @param response - */ - boolean importSubPlanCheckDataZip(MultipartFile file, String patternId, HttpServletResponse response); - - /** - * 合并计划检测结果数据 - * - * @param planId - */ - boolean mergePlanCheckData(String planId); - } diff --git a/detection/src/main/java/com/njcn/gather/plan/service/SseClient.java b/detection/src/main/java/com/njcn/gather/plan/service/SseClient.java new file mode 100644 index 00000000..6f7a157d --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/plan/service/SseClient.java @@ -0,0 +1,130 @@ +package com.njcn.gather.plan.service; + +import cn.hutool.core.util.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +@Slf4j +@Component +public class SseClient { + private static final Map sseEmitterMap = new ConcurrentHashMap<>(); + private static final int RECONNECT_DELAY = 5000; // 重连延迟时间,5秒 + + /** + * 创建SSE连接 + * + * @param uid 用户唯一标识 + * @return SseEmitter 实例 + */ + public SseEmitter createSse(String uid) { + // 创建一个永不超时的SseEmitter + SseEmitter sseEmitter = new SseEmitter(0L); + sseEmitterMap.put(uid, sseEmitter); + + // 完成后的回调,记录日志并从map中移除 + sseEmitter.onCompletion(() -> { + log.info("[{}]结束连接...................", uid); + sseEmitterMap.remove(uid); + }); + + // 超时回调,记录日志 + sseEmitter.onTimeout(() -> { + log.info("[{}]连接超时,准备重连...................", uid); + scheduleReconnect(uid); + }); + + // 异常回调,记录日志并发送错误事件,然后重试 + sseEmitter.onError(throwable -> { + log.error("[{}]连接异常,{}", uid, throwable.toString(), throwable); + try { + sseEmitter.send(SseEmitter.event() + .id(uid) + .name("发生异常!") + .data("发生异常请重试!") + .reconnectTime(RECONNECT_DELAY)); + } catch (IOException e) { + log.error("[{}]发送错误事件失败", uid, e); + } + scheduleReconnect(uid); + }); + + try { + // 发送初始化事件,设置重连时间 + sseEmitter.send(SseEmitter.event().reconnectTime(RECONNECT_DELAY)); + } catch (IOException e) { + log.error("[{}]发送初始化事件失败", uid, e); + } + + log.info("[{}]创建sse连接成功!", uid); + return sseEmitter; + } + + /** + * 给指定用户发送消息(可以定时或在事件发生时调用sseEmitter.send()方法来发送事件。) + * + * @param uid 用户唯一标识 + * @param messageId 消息ID + * @param message 消息内容 + * @return 是否发送成功 + */ + public boolean sendMessage(String uid, String messageId, Object message) { + if (ObjectUtil.isEmpty(message)) { + log.warn("参数异常,message 为null"); + return false; + } + SseEmitter sseEmitter = sseEmitterMap.get(uid); + if (sseEmitter == null) { + log.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid); + return false; + } + try { + sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1 * 60 * 1000L).data(message)); + log.info("用户{},消息id:{},推送成功:{}", uid, messageId, message); + return true; + } catch (Exception e) { + log.error("用户{},消息id:{},推送异常:{}", uid, messageId, e.getMessage(), e); + sseEmitter.complete(); + scheduleReconnect(uid); + return false; + } + } + + /** + * 断开SSE连接 + * + * @param uid 用户唯一标识 + */ + public void closeSse(String uid) { + SseEmitter sseEmitter = sseEmitterMap.remove(uid); + if (sseEmitter != null) { + sseEmitter.complete(); + log.info("用户{} 连接已关闭", uid); + } else { + log.info("用户{} 连接已关闭,或连接不存在", uid); + } + } + + /** + * 计划重连 + * + * @param uid 用户唯一标识 + */ + private void scheduleReconnect(String uid) { + // 这里可以添加一个定时任务来尝试重连 + // 例如使用Spring的@Scheduled注解或者ScheduledExecutorService + log.info("[{}]计划在{}毫秒后重连", uid, RECONNECT_DELAY); + // 模拟重连操作,实际应用中应根据业务需求实现 + try { + Thread.sleep(RECONNECT_DELAY); + createSse(uid); + } catch (InterruptedException e) { + log.error("[{}]重连操作被中断", uid, e); + } + } +} \ No newline at end of file diff --git a/detection/src/main/java/com/njcn/gather/plan/service/impl/AdPlanServiceImpl.java b/detection/src/main/java/com/njcn/gather/plan/service/impl/AdPlanServiceImpl.java index e492b0c6..aa5a3e14 100644 --- a/detection/src/main/java/com/njcn/gather/plan/service/impl/AdPlanServiceImpl.java +++ b/detection/src/main/java/com/njcn/gather/plan/service/impl/AdPlanServiceImpl.java @@ -6,7 +6,6 @@ import cn.afterturn.easypoi.excel.entity.ImportParams; import cn.afterturn.easypoi.excel.entity.result.ExcelImportResult; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.date.DateUtil; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.ObjectUtil; @@ -267,6 +266,7 @@ public class AdPlanServiceImpl extends ServiceImpl impleme adPlan.setState(DataStateEnum.ENABLE.getCode()); boolean addTestConfig = true; + boolean addTable = true; if (StrUtil.isBlank(param.getFatherPlanId())) { // 默认为顶级检测计划 adPlan.setFatherPlanId(CommonEnum.FATHER_ID.getValue()); @@ -275,6 +275,7 @@ public class AdPlanServiceImpl extends ServiceImpl impleme adPlan.setFatherPlanId(param.getFatherPlanId()); adPlan.setOrigin(plan.getName()); addTestConfig = false; + addTable = false; } adPlan.setTestState(CheckStateEnum.UNCHECKED.getValue()); adPlan.setReportState(PlanReportStateEnum.REPORT_STATE_NOT_GENERATED.getValue()); @@ -324,8 +325,10 @@ public class AdPlanServiceImpl extends ServiceImpl impleme // 关联检测源 adPlanSourceService.addAdPlanSource(planId, param.getSourceIds()); } - tableGenService.deleteTable(Collections.singletonList(adPlan.getCode().toString())); - tableGenService.genTable(adPlan.getCode().toString(), PatternEnum.CONTRAST.getValue().equals(dictData.getCode())); + if (addTable) { + tableGenService.deleteTable(Collections.singletonList(adPlan.getCode().toString())); + tableGenService.genTable(adPlan.getCode().toString(), PatternEnum.CONTRAST.getValue().equals(dictData.getCode())); + } return true; } @@ -1900,33 +1903,81 @@ public class AdPlanServiceImpl extends ServiceImpl impleme planCheckDataVO.setDevSubList(devSubList); // 被检设备监测点信息 List monitorList = pqMonitorService.list(new LambdaQueryWrapper().in(PqMonitor::getDevId, devIdList)); - List monitorIds = monitorList.stream().map(PqMonitor::getId).collect(Collectors.toList()); + planCheckDataVO.setMonitorList(monitorList); + // devMonitorId = 被检设备ID+通道号 + List devMonitorIds = new ArrayList<>(); + for (PqDev dev : devList) { + List channelNoList = StrUtil.split(dev.getInspectChannel(), StrUtil.COMMA); + for (String channelNo : channelNoList) { + devMonitorIds.add(dev.getId() + StrUtil.UNDERLINE + channelNo); + } + } + planCheckDataVO.setDevMonitorIds(devMonitorIds); // 设备通道匹对关系 - List pairList = adPairService.list(new LambdaQueryWrapper().eq(AdPair::getPlanId, planId).in(AdPair::getDevMonitorId, monitorIds)); + List pairList = adPairService.list(new LambdaQueryWrapper().eq(AdPair::getPlanId, planId).in(AdPair::getDevMonitorId, devMonitorIds)); planCheckDataVO.setPairList(pairList); // 获取计划检测结果数据表以及数据 Integer code = plan.getCode(); - Map>> checkData = new HashMap<>(); - List dataTableNames = CollUtil.newArrayList("ad_harmonic_" + code, "ad_non_harmonic_" + code, "ad_harmonic_result_" + code, "ad_non_harmonic_result_" + code); - if (CollUtil.isNotEmpty(monitorIds)) { + + // 创建临时目录用于存储txt文件 + File tempDataDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "plan_data_" + System.currentTimeMillis() + "/"); + List dataFiles = new ArrayList<>(); + int dataBatch = 0; + if (CollUtil.isNotEmpty(pairList)) { for (String dataTableName : dataTableNames) { - StringBuilder sql = new StringBuilder("select * from " + dataTableName); + // 创建数据文件 + String fileName = dataTableName.replace("_" + code, "") + ".txt"; + File dataFile = FileUtil.file(tempDataDir, fileName); + // 确保文件存在 + FileUtil.touch(dataFile); - sql.append(" where Dev_Monitor_Id in ("); - for (int i = 0; i < monitorIds.size(); i++) { - sql.append("'").append(monitorIds.get(i)).append("'"); - if (i < monitorIds.size() - 1) { - sql.append(","); + // 初始化写入标志,用于判断是否已写入字段名 + boolean isFirstWrite = true; + + // 分页查询,避免一次性加载大量数据 + int pageSize = 10000; // 每页查询10000条记录 + int offset = 0; + List> pageData; + do { + dataBatch += 1; + String paginatedSql = buildPaginatedQuery(dataTableName, devMonitorIds, pageSize, offset); + pageData = jdbcTemplate.queryForList(paginatedSql); + + // 将当前页数据追加到文件中 + if (CollUtil.isNotEmpty(pageData)) { + StringBuilder content = new StringBuilder(); + + // 如果是第一次写入,先写入字段名 + if (isFirstWrite) { + // 获取字段名 + Map firstRow = pageData.get(0); + List fieldNames = new ArrayList<>(firstRow.keySet()); + + // 写入字段名作为第一行 + content.append(StrUtil.join("\t", fieldNames)).append(System.lineSeparator()); + isFirstWrite = false; + } + + // 写入数据行 + for (Map data : pageData) { + List values = new ArrayList<>(data.values()); + content.append(StrUtil.join("\t", values)).append(System.lineSeparator()); + } + + // 追加内容到文件 + FileUtil.appendUtf8String(content.toString(), dataFile); } - } - sql.append(")"); + offset += pageSize; + } while (pageData.size() == pageSize); // 如果查询结果少于pageSize,说明已经查询完所有数据 - List> dataList = jdbcTemplate.queryForList(sql.toString()); - checkData.put(dataTableName, dataList); + // 如果文件存在且不为空,则添加到数据文件列表中 + if (FileUtil.exist(dataFile) && FileUtil.size(dataFile) > 0) { + dataFiles.add(dataFile); + } } } - planCheckDataVO.setCheckData(checkData); + planCheckDataVO.setDataBatch(dataBatch); // 导出数据.zip文件 String jsonStr = JSONUtil.toJsonStr(planCheckDataVO, new JSONConfig().setIgnoreNullValue(false)); @@ -1943,10 +1994,16 @@ public class AdPlanServiceImpl extends ServiceImpl impleme String zipFileName = URLEncoder.encode(plan.getName() + "_检测数据.zip", "UTF-8"); File zipFile = FileUtil.file(tempDir, zipFileName); - // 创建一个临时目录存放文件 + // 创建一个临时目录存放所有文件 File tempZipDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "temp_plan_check_data_" + System.currentTimeMillis() + "/"); // 复制json文件到临时目录 FileUtil.copy(jsonFile, tempZipDir, true); + + // 复制数据txt文件到临时目录 + for (File dataFile : dataFiles) { + FileUtil.copy(dataFile, tempZipDir, true); + } + // 添加检测报告文件 if (ObjectUtil.isNotNull(report) && report.equals(1)) { for (PqDev dev : devList) { @@ -1961,11 +2018,14 @@ public class AdPlanServiceImpl extends ServiceImpl impleme } } + // 重新创建zip文件,包含所有文件 ZipUtil.zip(tempZipDir.getAbsolutePath(), zipFile.getAbsolutePath()); // 删除临时目录 FileUtil.del(tempZipDir); + FileUtil.del(tempDataDir); + // 设置响应头 response.reset(); response.setContentType("application/octet-stream;charset=UTF-8"); @@ -1985,199 +2045,24 @@ public class AdPlanServiceImpl extends ServiceImpl impleme } } - @Transactional - @Override - public boolean importSubPlanCheckDataZip(MultipartFile file, String patternId, HttpServletResponse response) { - try { - // 创建临时目录用于解压文件 - File tempDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "import_plan_check_data_" + System.currentTimeMillis() + "/"); + // 构建分页查询SQL + private String buildPaginatedQuery(String tableName, List devMonitorIds, int limit, int offset) { + StringBuilder sql = new StringBuilder("SELECT * FROM " + tableName); - // 将上传的zip文件保存到临时目录 - File zipFile = FileUtil.file(tempDir, file.getOriginalFilename()); - file.transferTo(zipFile); - - // 解压zip文件 - File unzipDir = FileUtil.mkdir(FileUtil.file(tempDir, "unzip")); - ZipUtil.unzip(zipFile.getAbsolutePath(), unzipDir.getAbsolutePath()); - - // 查找解压目录中的json文件 - File[] files = unzipDir.listFiles(); - AdPlanCheckDataVO planCheckDataVO = null; - if (files != null) { - for (File f : files) { - if (f.isFile() && f.getName().endsWith(".json")) { - // 读取json文件内容 - String jsonStr = FileUtil.readUtf8String(f); - planCheckDataVO = JSONUtil.toBean(jsonStr, AdPlanCheckDataVO.class); - break; - } - } + sql.append(" WHERE Dev_Monitor_Id IN ("); + for (int i = 0; i < devMonitorIds.size(); i++) { + sql.append("'").append(devMonitorIds.get(i)).append("'"); + if (i < devMonitorIds.size() - 1) { + sql.append(","); } - - if (planCheckDataVO == null) { - FileUtil.del(tempDir); - throw new BusinessException(CommonResponseEnum.FAIL, "ZIP文件中未找到JSON文件"); - } - AdPlan checkPlan = planCheckDataVO.getPlan(); - if (checkPlan == null) { - FileUtil.del(tempDir); - throw new BusinessException(CommonResponseEnum.FAIL, "ZIP文件中未找到检测计划信息"); - } - // 检查导入的计划是否属于当前模式 - if (!StrUtil.equals(checkPlan.getPattern(), patternId)) { - FileUtil.del(tempDir); - throw new BusinessException(CommonResponseEnum.FAIL, "该检修计划当前模式不支持导入"); - } - - List devList = planCheckDataVO.getDevList(); - if (CollUtil.isEmpty(devList)) { - FileUtil.del(tempDir); - throw new BusinessException(CommonResponseEnum.FAIL, "该检修计划未找到被检设备信息"); - } - - AdPlan subPlan = this.getById(checkPlan.getId()); - if (subPlan == null) { - FileUtil.del(tempDir); - throw new BusinessException(CommonResponseEnum.FAIL, "该检修计划不存在"); - } - // 更新检测计划信息 - checkPlan.setFatherPlanId(subPlan.getFatherPlanId()); - this.updateById(checkPlan); - - - // 批量更新被检设备信息 - // 不更新导入标志 - devList.forEach(dev -> dev.setImportFlag(null)); - pqDevService.updateBatchById(devList); - - List devSubList = planCheckDataVO.getDevSubList(); - for (PqDevSub devSub : devSubList) { - pqDevSubService.update(devSub, new LambdaUpdateWrapper().eq(PqDevSub::getDevId, devSub.getDevId())); - } - - // 同步检测数据 - List pairList = planCheckDataVO.getPairList(); - adPairService.updateBatchById(pairList); - Map>> checkData = planCheckDataVO.getCheckData(); - // 批量插入数据 - for (Map.Entry>> entry : checkData.entrySet()) { - String tableName = entry.getKey(); - List> dataList = entry.getValue(); - - if (CollUtil.isNotEmpty(dataList)) { - for (Map row : dataList) { - // 构造批量插入SQL - 收集所有可能的列名 - List columnNames = new ArrayList<>(row.keySet()); - - String sql = "INSERT INTO " + tableName + " (" + - columnNames.stream().map(column -> "`" + column + "`").collect(Collectors.joining(",")) + - ") VALUES " + - "(" + columnNames.stream().map(column -> "?").collect(Collectors.joining(",")) + ")"; - List params = new ArrayList<>(); - for (String column : columnNames) { - Object value = row.getOrDefault(column, null); - if (value != null && column.equals("Time_Id")) { - params.add(DateUtil.toLocalDateTime(new Date(Long.parseLong(value.toString())))); - } else { - params.add(value); - } - - } - jdbcTemplate.update(sql, params.toArray()); - } - } - } - - // 报告文件复制到指定位置 - for (File f : files) { - if (f.isFile() && f.getName().endsWith(ReportConstant.DOCX)) { - for (PqDev dev : devList) { - DevType devType = devTypeService.getById(dev.getDevType()); - String dirPath = reportPath.concat(File.separator).concat(devType.getName()); - File reportFile = new File(dirPath.concat(File.separator).concat(dev.getCreateId()).concat(ReportConstant.DOCX)); - // 文件名匹配,复制到对应目录下 - if (f.getName().equals(reportFile.getName())) { - File parentDir = reportFile.getParentFile(); - if (!parentDir.exists()) { - parentDir.mkdirs(); - } - FileUtil.copy(f, parentDir, true); - } - - } - } - } - // 删除临时目录 - FileUtil.del(tempDir); - return true; - } catch (Exception e) { - log.error("导入子计划检测结果信息zip失败: ", e); - throw new BusinessException(CommonResponseEnum.FAIL, "导入失败"); } + sql.append(")"); + + // 添加分页限制 + sql.append(" LIMIT ").append(limit).append(" OFFSET ").append(offset); + + return sql.toString(); } - @Transactional - @Override - public boolean mergePlanCheckData(String planId) { - - AdPlan plan = this.getById(planId); - if (!plan.getFatherPlanId().equals(CommonEnum.FATHER_ID.getValue())) { - throw new BusinessException(CommonResponseEnum.FAIL, "该计划非主计划"); - } - Integer planCode = plan.getCode(); - // 获取所有子计划 - List subPlanList = this.lambdaQuery().eq(AdPlan::getFatherPlanId, planId).list(); - if (CollUtil.isEmpty(subPlanList)) { - throw new BusinessException(CommonResponseEnum.FAIL, "该计划未找到子计划"); - } - // 合并前清除相关表数据 - String mainHarmonicTableName = "ad_harmonic_" + planCode; - String mainNonHarmonicTableName = "ad_non_harmonic_" + planCode; - String mainHarmonicResultTableName = "ad_harmonic_result_" + planCode; - String mainNonHarmonicResultTableName = "ad_non_harmonic_result_" + planCode; - jdbcTemplate.update("DELETE FROM " + mainHarmonicTableName); - jdbcTemplate.update("DELETE FROM " + mainNonHarmonicTableName); - jdbcTemplate.update("DELETE FROM " + mainHarmonicResultTableName); - jdbcTemplate.update("DELETE FROM " + mainNonHarmonicResultTableName); - - // 获取所有子计划检测数据 - List subPlanCodeList = subPlanList.stream().map(AdPlan::getCode).collect(Collectors.toList()); - List harmonicTableNames = new ArrayList<>(); - List nonHarmonicTableNames = new ArrayList<>(); - List harmonicResultTableNames = new ArrayList<>(); - List nonHarmonicResultTableNames = new ArrayList<>(); - for (Integer code : subPlanCodeList) { - harmonicTableNames.add("ad_harmonic_" + code); - nonHarmonicTableNames.add("ad_non_harmonic_" + code); - harmonicResultTableNames.add("ad_harmonic_result_" + code); - nonHarmonicResultTableNames.add("ad_non_harmonic_result_" + code); - } - - // 将子计划的谐波数据插入到主计划的谐波表中 - for (String harmonicTableName : harmonicTableNames) { - String sql = "INSERT INTO " + mainHarmonicTableName + " SELECT * FROM " + harmonicTableName; - jdbcTemplate.update(sql); - } - - // 将子计划的非谐波数据插入到主计划的非谐波表中 - for (String nonHarmonicTableName : nonHarmonicTableNames) { - String sql = "INSERT INTO " + mainNonHarmonicTableName + " SELECT * FROM " + nonHarmonicTableName; - jdbcTemplate.update(sql); - } - - // 将子计划的谐波结果数据插入到主计划的谐波结果表中 - for (String harmonicResultTableName : harmonicResultTableNames) { - String sql = "INSERT INTO " + mainHarmonicResultTableName + " SELECT * FROM " + harmonicResultTableName; - jdbcTemplate.update(sql); - } - - // 将子计划的非谐波结果数据插入到主计划的非谐波结果表中 - for (String nonHarmonicResultTableName : nonHarmonicResultTableNames) { - String sql = "INSERT INTO " + mainNonHarmonicResultTableName + " SELECT * FROM " + nonHarmonicResultTableName; - jdbcTemplate.update(sql); - } - - return true; - } } diff --git a/detection/src/main/java/com/njcn/gather/plan/service/util/BatchFileReader.java b/detection/src/main/java/com/njcn/gather/plan/service/util/BatchFileReader.java new file mode 100644 index 00000000..be948c69 --- /dev/null +++ b/detection/src/main/java/com/njcn/gather/plan/service/util/BatchFileReader.java @@ -0,0 +1,117 @@ +package com.njcn.gather.plan.service.util; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.StrUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * 分批读取文件行的工具类 + * 用于处理大文件,避免一次性加载整个文件到内存中导致内存溢出 + */ +public class BatchFileReader { + + private static final Logger log = LoggerFactory.getLogger(BatchFileReader.class); + + /** + * 默认批次大小 + */ + private static final int DEFAULT_BATCH_SIZE = 10000; + + /** + * 分批读取文件行 + * + * @param file 要读取的文件 + * @param batchSize 批次大小 + * @param consumer 处理每批数据的消费者函数 + */ + public static void readLinesInBatches(File file, int batchSize, Consumer> consumer) { + readLinesInBatches(file, batchSize, Charset.defaultCharset(), consumer); + } + + /** + * 分批读取文件行 + * + * @param file 要读取的文件 + * @param batchSize 批次大小 + * @param charset 文件编码 + * @param consumer 处理每批数据的消费者函数 + */ + public static void readLinesInBatches(File file, int batchSize, Charset charset, Consumer> consumer) { + if (batchSize <= 0) { + batchSize = DEFAULT_BATCH_SIZE; + } + + if (!FileUtil.exist(file)) { + log.warn("文件不存在: {}", file.getAbsolutePath()); + return; + } + + try (BufferedReader reader = IoUtil.getReader(new InputStreamReader(FileUtil.getInputStream(file), charset))) { + List batchLines = new ArrayList<>(batchSize); + String line; + int count = 0; + + while ((line = reader.readLine()) != null) { + batchLines.add(line); + count++; + + // 当达到批次大小时,处理这一批数据 + if (count % batchSize == 0) { + if (CollUtil.isNotEmpty(batchLines)) { + consumer.accept(batchLines); + batchLines = new ArrayList<>(batchSize); + } + } + } + + // 处理最后一批数据(不足一批的数据) + if (CollUtil.isNotEmpty(batchLines)) { + consumer.accept(batchLines); + } + + } catch (IOException e) { + log.error("读取文件失败: {}", file.getAbsolutePath(), e); + throw new RuntimeException("读取文件失败: " + file.getAbsolutePath(), e); + } + } + + /** + * 分批读取文件行(使用默认批次大小) + * + * @param file 要读取的文件 + * @param consumer 处理每批数据的消费者函数 + */ + public static void readLinesInBatches(File file, Consumer> consumer) { + readLinesInBatches(file, DEFAULT_BATCH_SIZE, consumer); + } + + /** + * 分批读取文件行(使用默认编码) + * + * @param filePath 文件路径 + * @param batchSize 批次大小 + * @param consumer 处理每批数据的消费者函数 + */ + public static void readLinesInBatches(String filePath, int batchSize, Consumer> consumer) { + readLinesInBatches(FileUtil.file(filePath), batchSize, consumer); + } + + /** + * 分批读取文件行(使用默认编码和批次大小) + * + * @param filePath 文件路径 + * @param consumer 处理每批数据的消费者函数 + */ + public static void readLinesInBatches(String filePath, Consumer> consumer) { + readLinesInBatches(FileUtil.file(filePath), DEFAULT_BATCH_SIZE, consumer); + } +} \ No newline at end of file diff --git a/system/src/main/java/com/njcn/gather/system/config/handler/NonWebAutoFillValueHandler.java b/system/src/main/java/com/njcn/gather/system/config/handler/NonWebAutoFillValueHandler.java new file mode 100644 index 00000000..810c2e97 --- /dev/null +++ b/system/src/main/java/com/njcn/gather/system/config/handler/NonWebAutoFillValueHandler.java @@ -0,0 +1,65 @@ +package com.njcn.gather.system.config.handler; + +import cn.hutool.core.util.StrUtil; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.db.mybatisplus.handler.AutoFillValueHandler; +import com.njcn.web.utils.RequestUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; + +import java.util.function.Supplier; + +@Primary +@Component +@Slf4j +public class NonWebAutoFillValueHandler extends AutoFillValueHandler { + + /** + * 当前用户ID的线程本地变量,用于非Web环境 + */ + private static final ThreadLocal CURRENT_USER_ID = new ThreadLocal<>(); + + + @Override + public Supplier getUserIdSupplier() { + return () -> { + try { + // 首先尝试从Web环境获取用户ID + String userId = RequestUtil.getUserId(); + String actualUserId = StrUtil.isBlank(userId) ? "未知用户" : userId; + return actualUserId; + } catch (BusinessException e) { + // 如果是"当前请求web环境为空"异常,则尝试从线程本地变量获取 + if (e.getMessage().contains("当前请求web环境为空")) { + String userId = CURRENT_USER_ID.get(); + if (userId != null) { + return userId; + } + // 如果线程本地变量中也没有用户ID,则返回默认值 + log.warn("无法获取当前用户ID"); + return "未知用户"; + } + // 其他异常直接抛出 + throw e; + } + }; + } + + /** + * 在非Web环境中设置当前用户ID + * + * @param userId 用户ID + */ + public static void setCurrentUserId(String userId) { + CURRENT_USER_ID.set(userId); + } + + /** + * 清除当前线程的用户ID设置 + */ + public static void clearCurrentUserId() { + CURRENT_USER_ID.remove(); + } + +} \ No newline at end of file