UPDATE: 1、优化导出计划检测数据;2、删除导入和合并计划检测数据接口,改成导入并合并计划检测数据异步逻辑。

This commit is contained in:
贾同学
2025-09-11 11:08:30 +08:00
parent 90eb90554b
commit 46e8811e59
9 changed files with 772 additions and 261 deletions

View File

@@ -19,6 +19,7 @@ import com.njcn.gather.device.service.IPqDevService;
import com.njcn.gather.plan.pojo.param.AdPlanParam; import com.njcn.gather.plan.pojo.param.AdPlanParam;
import com.njcn.gather.plan.pojo.po.AdPlan; import com.njcn.gather.plan.pojo.po.AdPlan;
import com.njcn.gather.plan.pojo.vo.AdPlanVO; import com.njcn.gather.plan.pojo.vo.AdPlanVO;
import com.njcn.gather.plan.service.AsyncPlanHandler;
import com.njcn.gather.plan.service.IAdPlanService; import com.njcn.gather.plan.service.IAdPlanService;
import com.njcn.gather.type.pojo.po.DevType; import com.njcn.gather.type.pojo.po.DevType;
import com.njcn.gather.type.service.IDevTypeService; import com.njcn.gather.type.service.IDevTypeService;
@@ -43,6 +44,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.njcn.web.utils.RequestUtil.getUserId;
/** /**
* @author caozehui * @author caozehui
* @date 2024-12-09 * @date 2024-12-09
@@ -58,6 +61,7 @@ public class AdPlanController extends BaseController {
private final IPqDevService pqDevService; private final IPqDevService pqDevService;
private final IDevTypeService devTypeService; private final IDevTypeService devTypeService;
private final ISysUserService sysUserService; private final ISysUserService sysUserService;
private final AsyncPlanHandler asyncPlanHandler;
@OperateInfo(info = LogEnum.BUSINESS_COMMON) @OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/list") @PostMapping("/list")
@@ -175,7 +179,7 @@ public class AdPlanController extends BaseController {
public HttpResult<List<Map<String, String>>> getBigTestItem(@RequestBody AdPlanParam.CheckParam checkParam) { public HttpResult<List<Map<String, String>>> getBigTestItem(@RequestBody AdPlanParam.CheckParam checkParam) {
String methodDescribe = getMethodDescribe("getBigTestItem"); String methodDescribe = getMethodDescribe("getBigTestItem");
LogUtil.njcnDebug(log, "{},查询数据为:{}", methodDescribe, checkParam); LogUtil.njcnDebug(log, "{},查询数据为:{}", methodDescribe, checkParam);
List<Map<String, String>> result = adPlanService.getBigTestItem(checkParam.getReCheckType(), checkParam.getPlanId(), checkParam.getDevIds(), checkParam.getPatternId(),checkParam.getScriptType()); List<Map<String, String>> result = adPlanService.getBigTestItem(checkParam.getReCheckType(), checkParam.getPlanId(), checkParam.getDevIds(), checkParam.getPatternId(), checkParam.getScriptType());
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
} }
@@ -400,39 +404,6 @@ public class AdPlanController extends BaseController {
} }
@OperateInfo(info = LogEnum.BUSINESS_COMMON, operateType = OperateType.UPLOAD)
@PostMapping(value = "/importSubPlanCheckData")
@ApiOperation("导入子计划检测结果")
@ApiImplicitParams({
@ApiImplicitParam(name = "file", value = "检测计划检测结果数据文件", required = true),
@ApiImplicitParam(name = "patternId", value = "模式Id", required = true)
})
public HttpResult<Boolean> importSubPlanCheckData(@RequestParam("file") MultipartFile file, @RequestParam("patternId") String patternId, HttpServletResponse response) {
String methodDescribe = getMethodDescribe("importSubPlanCheckData");
LogUtil.njcnDebug(log, "{},上传文件为:{}", methodDescribe, file.getOriginalFilename());
boolean fileType = FileUtil.judgeFileIsZip(file.getOriginalFilename());
if (!fileType) {
CommonResponseEnum fileTypeError = CommonResponseEnum.FILE_XLSX_ERROR;
fileTypeError.setMessage("请上传zip文件");
throw new BusinessException(fileTypeError);
}
adPlanService.importSubPlanCheckDataZip(file, patternId, response);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON, operateType = OperateType.UPLOAD)
@PostMapping(value = "/mergePlanCheckData")
@ApiOperation("合并计划检测结果数据")
@ApiImplicitParam(name = "planId", value = "计划id", required = true)
public HttpResult<Boolean> mergePlanCheckData(@RequestParam("planId") String planId) {
String methodDescribe = getMethodDescribe("mergePlanCheckData");
LogUtil.njcnDebug(log, "{}合并计划ID数据为{}", methodDescribe, planId);
adPlanService.mergePlanCheckData(planId);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON) @OperateInfo(info = LogEnum.BUSINESS_COMMON)
@GetMapping("/getMemberList") @GetMapping("/getMemberList")
@ApiOperation("根据计划ID获取项目成员") @ApiOperation("根据计划ID获取项目成员")
@@ -452,5 +423,27 @@ public class AdPlanController extends BaseController {
} }
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
} }
@OperateInfo(info = LogEnum.BUSINESS_COMMON, operateType = OperateType.UPLOAD)
@PostMapping(value = "/importAndMergePlanCheckData")
@ApiOperation("合并计划检测结果数据")
@ApiImplicitParams({
@ApiImplicitParam(name = "file", value = "检测计划检测结果数据文件", required = true),
@ApiImplicitParam(name = "planId", value = "计划Id", required = true)
})
public HttpResult<Boolean> importAndMergePlanCheckData(@RequestParam("file") MultipartFile file, @RequestParam("planId") String planId) {
String methodDescribe = getMethodDescribe("importAndMergePlanCheckData");
LogUtil.njcnDebug(log, "{}合并计划ID检测数据为{}", methodDescribe, planId);
boolean fileType = FileUtil.judgeFileIsZip(file.getOriginalFilename());
if (!fileType) {
CommonResponseEnum fileTypeError = CommonResponseEnum.FILE_XLSX_ERROR;
fileTypeError.setMessage("请上传zip文件");
throw new BusinessException(fileTypeError);
}
asyncPlanHandler.importAndMergePlanCheckData(file, getUserId(), planId);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, true, methodDescribe);
}
} }

View File

@@ -0,0 +1,38 @@
package com.njcn.gather.plan.controller;
import com.njcn.gather.plan.service.SseClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import static com.njcn.web.utils.RequestUtil.getUserId;
/**
* SSE控制器用于处理SSE相关的请求。
*/
@Slf4j
@RequiredArgsConstructor
@RequestMapping("/sse")
@RestController
public class SseController {
private final SseClient sseClient;
/**
* 创建SSE连接。
*
* @return SseEmitter对象用于与客户端建立SSE连接。
*/
@GetMapping("/createSse")
public SseEmitter createConnect() {
String uid = getUserId();
SseEmitter sse = sseClient.createSse(uid);
log.info("为用户UID: {} 创建SSE连接", uid);
return sse;
}
}

View File

@@ -3,12 +3,12 @@ package com.njcn.gather.plan.pojo.vo;
import com.njcn.gather.detection.pojo.po.AdPair; import com.njcn.gather.detection.pojo.po.AdPair;
import com.njcn.gather.device.pojo.po.PqDev; import com.njcn.gather.device.pojo.po.PqDev;
import com.njcn.gather.device.pojo.po.PqDevSub; import com.njcn.gather.device.pojo.po.PqDevSub;
import com.njcn.gather.monitor.pojo.po.PqMonitor;
import com.njcn.gather.plan.pojo.po.AdPlan; import com.njcn.gather.plan.pojo.po.AdPlan;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import java.util.List; import java.util.List;
import java.util.Map;
@Data @Data
@EqualsAndHashCode(callSuper = false) @EqualsAndHashCode(callSuper = false)
@@ -18,5 +18,7 @@ public class AdPlanCheckDataVO {
private List<PqDev> devList; private List<PqDev> devList;
private List<PqDevSub> devSubList; private List<PqDevSub> devSubList;
private List<AdPair> pairList; private List<AdPair> pairList;
private Map<String, List<Map<String, Object>>> checkData; private List<PqMonitor> monitorList;
private List<String> devMonitorIds;
private int dataBatch;
} }

View File

@@ -0,0 +1,297 @@
package com.njcn.gather.plan.service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.ZipUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.gather.detection.pojo.po.AdPair;
import com.njcn.gather.detection.service.IAdPariService;
import com.njcn.gather.device.pojo.po.PqDev;
import com.njcn.gather.device.pojo.po.PqDevSub;
import com.njcn.gather.device.service.IPqDevService;
import com.njcn.gather.device.service.IPqDevSubService;
import com.njcn.gather.plan.pojo.po.AdPlan;
import com.njcn.gather.plan.pojo.vo.AdPlanCheckDataVO;
import com.njcn.gather.plan.service.util.BatchFileReader;
import com.njcn.gather.system.config.handler.NonWebAutoFillValueHandler;
import com.njcn.gather.tools.report.model.constant.ReportConstant;
import com.njcn.gather.type.pojo.po.DevType;
import com.njcn.gather.type.service.IDevTypeService;
import com.njcn.web.utils.HttpResultUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@EnableAsync
@RequiredArgsConstructor
@Component
public class AsyncPlanHandler {
private final SseClient sseClient;
private final IAdPlanService adPlanService;
private final IPqDevService pqDevService;
private final IDevTypeService devTypeService;
private final IPqDevSubService pqDevSubService;
private final IAdPariService adPairService;
private final JdbcTemplate jdbcTemplate;
@Value("${report.reportDir}")
private String reportPath;
@Transactional
@Async
public void importAndMergePlanCheckData(MultipartFile file, String uid, String planId) {
NonWebAutoFillValueHandler.setCurrentUserId(uid);
AtomicInteger progress = new AtomicInteger();
try {
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始保存文件"));
// 创建临时目录用于解压文件
File tempDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "import_plan_check_data_" + System.currentTimeMillis() + "/");
// 将上传的zip文件保存到临时目录
File zipFile = FileUtil.file(tempDir, file.getOriginalFilename());
file.transferTo(zipFile);
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始解压文件"));
// 解压zip文件
File unzipDir = FileUtil.mkdir(FileUtil.file(tempDir, "unzip"));
ZipUtil.unzip(zipFile.getAbsolutePath(), unzipDir.getAbsolutePath());
// 查找解压目录中的json文件
File[] files = unzipDir.listFiles();
AdPlanCheckDataVO planCheckDataVO = null;
List<File> dataFiles = new ArrayList<>();
List<File> docxFiles = new ArrayList<>();
if (files != null) {
for (File f : files) {
if (f.isFile()) {
// 读取json文件内容
if (f.getName().endsWith(".json")) {
String jsonStr = FileUtil.readUtf8String(f);
planCheckDataVO = JSONUtil.toBean(jsonStr, AdPlanCheckDataVO.class);
} else if (f.getName().endsWith(".docx")) {
docxFiles.add(f);
} else if (f.getName().endsWith(".txt")) {
dataFiles.add(f);
}
}
}
}
if (planCheckDataVO == null) {
FileUtil.del(tempDir);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "ZIP文件中未找到JSON数据文件"));
return;
}
AdPlan checkPlan = planCheckDataVO.getPlan();
if (checkPlan == null) {
FileUtil.del(tempDir);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "ZIP文件中未找到检测计划信息"));
return;
}
List<PqDev> devList = planCheckDataVO.getDevList();
if (CollUtil.isEmpty(devList)) {
FileUtil.del(tempDir);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "ZIP文件中未找到被检设备信息"));
return;
}
AdPlan subPlan = adPlanService.getById(checkPlan.getId());
if (subPlan == null) {
FileUtil.del(tempDir);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "该子计划不存在"));
return;
}
if (!StrUtil.equals(planId, subPlan.getFatherPlanId())) {
FileUtil.del(tempDir);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "该当前检修计划的子计划"));
return;
}
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步计划基本信息"));
// 更新检测计划信息
checkPlan.setFatherPlanId(subPlan.getFatherPlanId());
checkPlan.setImportFlag(0);
adPlanService.updateById(checkPlan);
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步计划设备信息"));
// 批量更新被检设备信息
// 不更新导入标志
devList.forEach(dev -> dev.setImportFlag(null));
pqDevService.updateBatchById(devList);
List<PqDevSub> devSubList = planCheckDataVO.getDevSubList();
for (PqDevSub devSub : devSubList) {
pqDevSubService.update(devSub, new LambdaUpdateWrapper<PqDevSub>().eq(PqDevSub::getDevId, devSub.getDevId()));
}
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步通道配对信息"));
// 同步检测数据
List<AdPair> pairList = planCheckDataVO.getPairList();
adPairService.updateBatchById(pairList);
if (CollUtil.isNotEmpty(docxFiles)) {
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步检测报告文件"));
for (File docx : docxFiles) {
for (PqDev dev : devList) {
DevType devType = devTypeService.getById(dev.getDevType());
String dirPath = reportPath.concat(File.separator).concat(devType.getName());
File reportFile = new File(dirPath.concat(File.separator).concat(dev.getCreateId()).concat(ReportConstant.DOCX));
// 文件名匹配,复制到对应目录下
if (docx.getName().equals(reportFile.getName())) {
File parentDir = FileUtil.mkParentDirs(reportFile);
FileUtil.copy(docx, parentDir, true);
}
}
}
}
if (CollUtil.isNotEmpty(dataFiles)) {
AdPlan plan = adPlanService.getById(planId);
Integer planCode = plan.getCode();
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始同步检测数据信息"));
// 合并前清除相关表数据
String mainHarmonicTableName = "ad_harmonic_" + planCode;
String mainNonHarmonicTableName = "ad_non_harmonic_" + planCode;
String mainHarmonicResultTableName = "ad_harmonic_result_" + planCode;
String mainNonHarmonicResultTableName = "ad_non_harmonic_result_" + planCode;
List<String> devMonitorIds = planCheckDataVO.getDevMonitorIds();
if (CollUtil.isNotEmpty(devMonitorIds)) {
// 使用 StringBuilder 构建带引号的ID列表防止SQL注入
StringBuilder sb = new StringBuilder();
for (int i = 0; i < devMonitorIds.size(); i++) {
if (i > 0) sb.append(",");
sb.append("'").append(devMonitorIds.get(i)).append("'");
}
String devMonitorIdsStr = sb.toString();
jdbcTemplate.update("DELETE FROM " + mainHarmonicTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")");
jdbcTemplate.update("DELETE FROM " + mainNonHarmonicTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")");
jdbcTemplate.update("DELETE FROM " + mainHarmonicResultTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")");
jdbcTemplate.update("DELETE FROM " + mainNonHarmonicResultTableName + " WHERE dev_monitor_id IN (" + devMonitorIdsStr + ")");
}
int dataBatch = planCheckDataVO.getDataBatch();
int step = 80 / (dataBatch + 1);
for (File dataFile : dataFiles) {
// 直接插入主计划表中
String fileName = FileUtil.mainName(dataFile);
String tableName = fileName + StrUtil.UNDERLINE + planCode;
// 使用BatchFileReader分批处理文件
final boolean[] isFirstBatch = {true};
final String[] headers = {null};
BatchFileReader.readLinesInBatches(dataFile, 10000, lines -> {
progress.addAndGet(step);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "同步检测数据信息中"));
if (CollUtil.isNotEmpty(lines)) {
if (isFirstBatch[0]) {
// 第一批次的第一行为字段名
headers[0] = lines.get(0);
// 处理剩余行作为数据
if (lines.size() > 1) {
List<String> dataLines = lines.subList(1, lines.size());
processBatchData(tableName, headers[0], dataLines);
}
isFirstBatch[0] = false;
} else {
// 后续批次全部为数据行
processBatchData(tableName, headers[0], lines);
}
}
});
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "" + tableName + " 数据同步完成"));
}
}
progress.addAndGet(1);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "开始合并检测数据信息"));
// 删除临时目录
FileUtil.del(tempDir);
progress.set(100);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.SUCCESS.getCode(), progress, "数据合并完成"));
} catch (Exception e) {
log.error("导入数据失败", e);
sseClient.sendMessage(uid, planId, HttpResultUtil.assembleResult(CommonResponseEnum.FAIL.getCode(), progress, "导入失败"));
} finally {
NonWebAutoFillValueHandler.clearCurrentUserId();
}
sseClient.closeSse(uid);
}
/**
* 处理数据行
*
* @param tableName 表名
* @param headers 表头
* @param lines 数据行
*/
private void processBatchData(String tableName, String headers, List<String> lines) {
if (lines == null || lines.isEmpty()) {
return;
}
// 构建INSERT语句
String[] headerArray = headers.split("\t");
String columnNames = String.join(",", headerArray);
String placeholders = String.join(",", java.util.Collections.nCopies(headerArray.length, "?"));
String sql = "INSERT INTO " + tableName + " (" + columnNames + ") VALUES (" + placeholders + ")";
// 批量插入数据
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(java.sql.PreparedStatement ps, int i) throws java.sql.SQLException {
String line = lines.get(i);
String[] fields = line.split("\t", -1); // 使用-1限制以保留空值
for (int j = 0; j < fields.length && j < headerArray.length; j++) {
String value = fields[j];
if (StrUtil.isEmpty(value) || StrUtil.equals(value, StrUtil.NULL)) {
ps.setNull(j + 1, java.sql.Types.VARCHAR);
} else {
ps.setString(j + 1, value);
}
}
}
@Override
public int getBatchSize() {
return lines.size();
}
});
}
}

View File

@@ -199,20 +199,4 @@ public interface IAdPlanService extends IService<AdPlan> {
*/ */
void exportPlanCheckDataZip(String planId, List<String> devIds, Integer report, HttpServletResponse response); void exportPlanCheckDataZip(String planId, List<String> devIds, Integer report, HttpServletResponse response);
/**
* 导入计划检测结果数据
*
* @param file
* @param patternId
* @param response
*/
boolean importSubPlanCheckDataZip(MultipartFile file, String patternId, HttpServletResponse response);
/**
* 合并计划检测结果数据
*
* @param planId
*/
boolean mergePlanCheckData(String planId);
} }

View File

@@ -0,0 +1,130 @@
package com.njcn.gather.plan.service;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class SseClient {
private static final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
private static final int RECONNECT_DELAY = 5000; // 重连延迟时间5秒
/**
* 创建SSE连接
*
* @param uid 用户唯一标识
* @return SseEmitter 实例
*/
public SseEmitter createSse(String uid) {
// 创建一个永不超时的SseEmitter
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitterMap.put(uid, sseEmitter);
// 完成后的回调记录日志并从map中移除
sseEmitter.onCompletion(() -> {
log.info("[{}]结束连接...................", uid);
sseEmitterMap.remove(uid);
});
// 超时回调,记录日志
sseEmitter.onTimeout(() -> {
log.info("[{}]连接超时,准备重连...................", uid);
scheduleReconnect(uid);
});
// 异常回调,记录日志并发送错误事件,然后重试
sseEmitter.onError(throwable -> {
log.error("[{}]连接异常,{}", uid, throwable.toString(), throwable);
try {
sseEmitter.send(SseEmitter.event()
.id(uid)
.name("发生异常!")
.data("发生异常请重试!")
.reconnectTime(RECONNECT_DELAY));
} catch (IOException e) {
log.error("[{}]发送错误事件失败", uid, e);
}
scheduleReconnect(uid);
});
try {
// 发送初始化事件,设置重连时间
sseEmitter.send(SseEmitter.event().reconnectTime(RECONNECT_DELAY));
} catch (IOException e) {
log.error("[{}]发送初始化事件失败", uid, e);
}
log.info("[{}]创建sse连接成功", uid);
return sseEmitter;
}
/**
* 给指定用户发送消息可以定时或在事件发生时调用sseEmitter.send()方法来发送事件。)
*
* @param uid 用户唯一标识
* @param messageId 消息ID
* @param message 消息内容
* @return 是否发送成功
*/
public boolean sendMessage(String uid, String messageId, Object message) {
if (ObjectUtil.isEmpty(message)) {
log.warn("参数异常message 为null");
return false;
}
SseEmitter sseEmitter = sseEmitterMap.get(uid);
if (sseEmitter == null) {
log.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid);
return false;
}
try {
sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1 * 60 * 1000L).data(message));
log.info("用户{},消息id:{},推送成功:{}", uid, messageId, message);
return true;
} catch (Exception e) {
log.error("用户{},消息id:{},推送异常:{}", uid, messageId, e.getMessage(), e);
sseEmitter.complete();
scheduleReconnect(uid);
return false;
}
}
/**
* 断开SSE连接
*
* @param uid 用户唯一标识
*/
public void closeSse(String uid) {
SseEmitter sseEmitter = sseEmitterMap.remove(uid);
if (sseEmitter != null) {
sseEmitter.complete();
log.info("用户{} 连接已关闭", uid);
} else {
log.info("用户{} 连接已关闭,或连接不存在", uid);
}
}
/**
* 计划重连
*
* @param uid 用户唯一标识
*/
private void scheduleReconnect(String uid) {
// 这里可以添加一个定时任务来尝试重连
// 例如使用Spring的@Scheduled注解或者ScheduledExecutorService
log.info("[{}]计划在{}毫秒后重连", uid, RECONNECT_DELAY);
// 模拟重连操作,实际应用中应根据业务需求实现
try {
Thread.sleep(RECONNECT_DELAY);
createSse(uid);
} catch (InterruptedException e) {
log.error("[{}]重连操作被中断", uid, e);
}
}
}

View File

@@ -6,7 +6,6 @@ import cn.afterturn.easypoi.excel.entity.ImportParams;
import cn.afterturn.easypoi.excel.entity.result.ExcelImportResult; import cn.afterturn.easypoi.excel.entity.result.ExcelImportResult;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
@@ -267,6 +266,7 @@ public class AdPlanServiceImpl extends ServiceImpl<AdPlanMapper, AdPlan> impleme
adPlan.setState(DataStateEnum.ENABLE.getCode()); adPlan.setState(DataStateEnum.ENABLE.getCode());
boolean addTestConfig = true; boolean addTestConfig = true;
boolean addTable = true;
if (StrUtil.isBlank(param.getFatherPlanId())) { if (StrUtil.isBlank(param.getFatherPlanId())) {
// 默认为顶级检测计划 // 默认为顶级检测计划
adPlan.setFatherPlanId(CommonEnum.FATHER_ID.getValue()); adPlan.setFatherPlanId(CommonEnum.FATHER_ID.getValue());
@@ -275,6 +275,7 @@ public class AdPlanServiceImpl extends ServiceImpl<AdPlanMapper, AdPlan> impleme
adPlan.setFatherPlanId(param.getFatherPlanId()); adPlan.setFatherPlanId(param.getFatherPlanId());
adPlan.setOrigin(plan.getName()); adPlan.setOrigin(plan.getName());
addTestConfig = false; addTestConfig = false;
addTable = false;
} }
adPlan.setTestState(CheckStateEnum.UNCHECKED.getValue()); adPlan.setTestState(CheckStateEnum.UNCHECKED.getValue());
adPlan.setReportState(PlanReportStateEnum.REPORT_STATE_NOT_GENERATED.getValue()); adPlan.setReportState(PlanReportStateEnum.REPORT_STATE_NOT_GENERATED.getValue());
@@ -324,8 +325,10 @@ public class AdPlanServiceImpl extends ServiceImpl<AdPlanMapper, AdPlan> impleme
// 关联检测源 // 关联检测源
adPlanSourceService.addAdPlanSource(planId, param.getSourceIds()); adPlanSourceService.addAdPlanSource(planId, param.getSourceIds());
} }
if (addTable) {
tableGenService.deleteTable(Collections.singletonList(adPlan.getCode().toString())); tableGenService.deleteTable(Collections.singletonList(adPlan.getCode().toString()));
tableGenService.genTable(adPlan.getCode().toString(), PatternEnum.CONTRAST.getValue().equals(dictData.getCode())); tableGenService.genTable(adPlan.getCode().toString(), PatternEnum.CONTRAST.getValue().equals(dictData.getCode()));
}
return true; return true;
} }
@@ -1900,33 +1903,81 @@ public class AdPlanServiceImpl extends ServiceImpl<AdPlanMapper, AdPlan> impleme
planCheckDataVO.setDevSubList(devSubList); planCheckDataVO.setDevSubList(devSubList);
// 被检设备监测点信息 // 被检设备监测点信息
List<PqMonitor> monitorList = pqMonitorService.list(new LambdaQueryWrapper<PqMonitor>().in(PqMonitor::getDevId, devIdList)); List<PqMonitor> monitorList = pqMonitorService.list(new LambdaQueryWrapper<PqMonitor>().in(PqMonitor::getDevId, devIdList));
List<String> monitorIds = monitorList.stream().map(PqMonitor::getId).collect(Collectors.toList()); planCheckDataVO.setMonitorList(monitorList);
// devMonitorId = 被检设备ID+通道号
List<String> devMonitorIds = new ArrayList<>();
for (PqDev dev : devList) {
List<String> channelNoList = StrUtil.split(dev.getInspectChannel(), StrUtil.COMMA);
for (String channelNo : channelNoList) {
devMonitorIds.add(dev.getId() + StrUtil.UNDERLINE + channelNo);
}
}
planCheckDataVO.setDevMonitorIds(devMonitorIds);
// 设备通道匹对关系 // 设备通道匹对关系
List<AdPair> pairList = adPairService.list(new LambdaQueryWrapper<AdPair>().eq(AdPair::getPlanId, planId).in(AdPair::getDevMonitorId, monitorIds)); List<AdPair> pairList = adPairService.list(new LambdaQueryWrapper<AdPair>().eq(AdPair::getPlanId, planId).in(AdPair::getDevMonitorId, devMonitorIds));
planCheckDataVO.setPairList(pairList); planCheckDataVO.setPairList(pairList);
// 获取计划检测结果数据表以及数据 // 获取计划检测结果数据表以及数据
Integer code = plan.getCode(); Integer code = plan.getCode();
Map<String, List<Map<String, Object>>> checkData = new HashMap<>();
List<String> dataTableNames = CollUtil.newArrayList("ad_harmonic_" + code, "ad_non_harmonic_" + code, "ad_harmonic_result_" + code, "ad_non_harmonic_result_" + code); List<String> dataTableNames = CollUtil.newArrayList("ad_harmonic_" + code, "ad_non_harmonic_" + code, "ad_harmonic_result_" + code, "ad_non_harmonic_result_" + code);
if (CollUtil.isNotEmpty(monitorIds)) {
// 创建临时目录用于存储txt文件
File tempDataDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "plan_data_" + System.currentTimeMillis() + "/");
List<File> dataFiles = new ArrayList<>();
int dataBatch = 0;
if (CollUtil.isNotEmpty(pairList)) {
for (String dataTableName : dataTableNames) { for (String dataTableName : dataTableNames) {
StringBuilder sql = new StringBuilder("select * from " + dataTableName); // 创建数据文件
String fileName = dataTableName.replace("_" + code, "") + ".txt";
File dataFile = FileUtil.file(tempDataDir, fileName);
// 确保文件存在
FileUtil.touch(dataFile);
sql.append(" where Dev_Monitor_Id in ("); // 初始化写入标志,用于判断是否已写入字段名
for (int i = 0; i < monitorIds.size(); i++) { boolean isFirstWrite = true;
sql.append("'").append(monitorIds.get(i)).append("'");
if (i < monitorIds.size() - 1) {
sql.append(",");
}
}
sql.append(")");
List<Map<String, Object>> dataList = jdbcTemplate.queryForList(sql.toString()); // 分页查询,避免一次性加载大量数据
checkData.put(dataTableName, dataList); int pageSize = 10000; // 每页查询10000条记录
int offset = 0;
List<Map<String, Object>> pageData;
do {
dataBatch += 1;
String paginatedSql = buildPaginatedQuery(dataTableName, devMonitorIds, pageSize, offset);
pageData = jdbcTemplate.queryForList(paginatedSql);
// 将当前页数据追加到文件中
if (CollUtil.isNotEmpty(pageData)) {
StringBuilder content = new StringBuilder();
// 如果是第一次写入,先写入字段名
if (isFirstWrite) {
// 获取字段名
Map<String, Object> firstRow = pageData.get(0);
List<String> fieldNames = new ArrayList<>(firstRow.keySet());
// 写入字段名作为第一行
content.append(StrUtil.join("\t", fieldNames)).append(System.lineSeparator());
isFirstWrite = false;
}
// 写入数据行
for (Map<String, Object> data : pageData) {
List<Object> values = new ArrayList<>(data.values());
content.append(StrUtil.join("\t", values)).append(System.lineSeparator());
}
// 追加内容到文件
FileUtil.appendUtf8String(content.toString(), dataFile);
}
offset += pageSize;
} while (pageData.size() == pageSize); // 如果查询结果少于pageSize说明已经查询完所有数据
// 如果文件存在且不为空,则添加到数据文件列表中
if (FileUtil.exist(dataFile) && FileUtil.size(dataFile) > 0) {
dataFiles.add(dataFile);
} }
} }
planCheckDataVO.setCheckData(checkData); }
planCheckDataVO.setDataBatch(dataBatch);
// 导出数据.zip文件 // 导出数据.zip文件
String jsonStr = JSONUtil.toJsonStr(planCheckDataVO, new JSONConfig().setIgnoreNullValue(false)); String jsonStr = JSONUtil.toJsonStr(planCheckDataVO, new JSONConfig().setIgnoreNullValue(false));
@@ -1943,10 +1994,16 @@ public class AdPlanServiceImpl extends ServiceImpl<AdPlanMapper, AdPlan> impleme
String zipFileName = URLEncoder.encode(plan.getName() + "_检测数据.zip", "UTF-8"); String zipFileName = URLEncoder.encode(plan.getName() + "_检测数据.zip", "UTF-8");
File zipFile = FileUtil.file(tempDir, zipFileName); File zipFile = FileUtil.file(tempDir, zipFileName);
// 创建一个临时目录存放文件 // 创建一个临时目录存放所有文件
File tempZipDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "temp_plan_check_data_" + System.currentTimeMillis() + "/"); File tempZipDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "temp_plan_check_data_" + System.currentTimeMillis() + "/");
// 复制json文件到临时目录 // 复制json文件到临时目录
FileUtil.copy(jsonFile, tempZipDir, true); FileUtil.copy(jsonFile, tempZipDir, true);
// 复制数据txt文件到临时目录
for (File dataFile : dataFiles) {
FileUtil.copy(dataFile, tempZipDir, true);
}
// 添加检测报告文件 // 添加检测报告文件
if (ObjectUtil.isNotNull(report) && report.equals(1)) { if (ObjectUtil.isNotNull(report) && report.equals(1)) {
for (PqDev dev : devList) { for (PqDev dev : devList) {
@@ -1961,11 +2018,14 @@ public class AdPlanServiceImpl extends ServiceImpl<AdPlanMapper, AdPlan> impleme
} }
} }
// 重新创建zip文件包含所有文件 // 重新创建zip文件包含所有文件
ZipUtil.zip(tempZipDir.getAbsolutePath(), zipFile.getAbsolutePath()); ZipUtil.zip(tempZipDir.getAbsolutePath(), zipFile.getAbsolutePath());
// 删除临时目录 // 删除临时目录
FileUtil.del(tempZipDir); FileUtil.del(tempZipDir);
FileUtil.del(tempDataDir);
// 设置响应头 // 设置响应头
response.reset(); response.reset();
response.setContentType("application/octet-stream;charset=UTF-8"); response.setContentType("application/octet-stream;charset=UTF-8");
@@ -1985,199 +2045,24 @@ public class AdPlanServiceImpl extends ServiceImpl<AdPlanMapper, AdPlan> impleme
} }
} }
@Transactional // 构建分页查询SQL
@Override private String buildPaginatedQuery(String tableName, List<String> devMonitorIds, int limit, int offset) {
public boolean importSubPlanCheckDataZip(MultipartFile file, String patternId, HttpServletResponse response) { StringBuilder sql = new StringBuilder("SELECT * FROM " + tableName);
try {
// 创建临时目录用于解压文件
File tempDir = FileUtil.mkdir(FileUtil.getTmpDirPath() + "import_plan_check_data_" + System.currentTimeMillis() + "/");
// 将上传的zip文件保存到临时目录 sql.append(" WHERE Dev_Monitor_Id IN (");
File zipFile = FileUtil.file(tempDir, file.getOriginalFilename()); for (int i = 0; i < devMonitorIds.size(); i++) {
file.transferTo(zipFile); sql.append("'").append(devMonitorIds.get(i)).append("'");
if (i < devMonitorIds.size() - 1) {
// 解压zip文件 sql.append(",");
File unzipDir = FileUtil.mkdir(FileUtil.file(tempDir, "unzip"));
ZipUtil.unzip(zipFile.getAbsolutePath(), unzipDir.getAbsolutePath());
// 查找解压目录中的json文件
File[] files = unzipDir.listFiles();
AdPlanCheckDataVO planCheckDataVO = null;
if (files != null) {
for (File f : files) {
if (f.isFile() && f.getName().endsWith(".json")) {
// 读取json文件内容
String jsonStr = FileUtil.readUtf8String(f);
planCheckDataVO = JSONUtil.toBean(jsonStr, AdPlanCheckDataVO.class);
break;
} }
} }
sql.append(")");
// 添加分页限制
sql.append(" LIMIT ").append(limit).append(" OFFSET ").append(offset);
return sql.toString();
} }
if (planCheckDataVO == null) {
FileUtil.del(tempDir);
throw new BusinessException(CommonResponseEnum.FAIL, "ZIP文件中未找到JSON文件");
}
AdPlan checkPlan = planCheckDataVO.getPlan();
if (checkPlan == null) {
FileUtil.del(tempDir);
throw new BusinessException(CommonResponseEnum.FAIL, "ZIP文件中未找到检测计划信息");
}
// 检查导入的计划是否属于当前模式
if (!StrUtil.equals(checkPlan.getPattern(), patternId)) {
FileUtil.del(tempDir);
throw new BusinessException(CommonResponseEnum.FAIL, "该检修计划当前模式不支持导入");
}
List<PqDev> devList = planCheckDataVO.getDevList();
if (CollUtil.isEmpty(devList)) {
FileUtil.del(tempDir);
throw new BusinessException(CommonResponseEnum.FAIL, "该检修计划未找到被检设备信息");
}
AdPlan subPlan = this.getById(checkPlan.getId());
if (subPlan == null) {
FileUtil.del(tempDir);
throw new BusinessException(CommonResponseEnum.FAIL, "该检修计划不存在");
}
// 更新检测计划信息
checkPlan.setFatherPlanId(subPlan.getFatherPlanId());
this.updateById(checkPlan);
// 批量更新被检设备信息
// 不更新导入标志
devList.forEach(dev -> dev.setImportFlag(null));
pqDevService.updateBatchById(devList);
List<PqDevSub> devSubList = planCheckDataVO.getDevSubList();
for (PqDevSub devSub : devSubList) {
pqDevSubService.update(devSub, new LambdaUpdateWrapper<PqDevSub>().eq(PqDevSub::getDevId, devSub.getDevId()));
}
// 同步检测数据
List<AdPair> pairList = planCheckDataVO.getPairList();
adPairService.updateBatchById(pairList);
Map<String, List<Map<String, Object>>> checkData = planCheckDataVO.getCheckData();
// 批量插入数据
for (Map.Entry<String, List<Map<String, Object>>> entry : checkData.entrySet()) {
String tableName = entry.getKey();
List<Map<String, Object>> dataList = entry.getValue();
if (CollUtil.isNotEmpty(dataList)) {
for (Map<String, Object> row : dataList) {
// 构造批量插入SQL - 收集所有可能的列名
List<String> columnNames = new ArrayList<>(row.keySet());
String sql = "INSERT INTO " + tableName + " (" +
columnNames.stream().map(column -> "`" + column + "`").collect(Collectors.joining(",")) +
") VALUES " +
"(" + columnNames.stream().map(column -> "?").collect(Collectors.joining(",")) + ")";
List<Object> params = new ArrayList<>();
for (String column : columnNames) {
Object value = row.getOrDefault(column, null);
if (value != null && column.equals("Time_Id")) {
params.add(DateUtil.toLocalDateTime(new Date(Long.parseLong(value.toString()))));
} else {
params.add(value);
}
}
jdbcTemplate.update(sql, params.toArray());
}
}
}
// 报告文件复制到指定位置
for (File f : files) {
if (f.isFile() && f.getName().endsWith(ReportConstant.DOCX)) {
for (PqDev dev : devList) {
DevType devType = devTypeService.getById(dev.getDevType());
String dirPath = reportPath.concat(File.separator).concat(devType.getName());
File reportFile = new File(dirPath.concat(File.separator).concat(dev.getCreateId()).concat(ReportConstant.DOCX));
// 文件名匹配,复制到对应目录下
if (f.getName().equals(reportFile.getName())) {
File parentDir = reportFile.getParentFile();
if (!parentDir.exists()) {
parentDir.mkdirs();
}
FileUtil.copy(f, parentDir, true);
}
}
}
}
// 删除临时目录
FileUtil.del(tempDir);
return true;
} catch (Exception e) {
log.error("导入子计划检测结果信息zip失败: ", e);
throw new BusinessException(CommonResponseEnum.FAIL, "导入失败");
}
}
@Transactional
@Override
public boolean mergePlanCheckData(String planId) {
AdPlan plan = this.getById(planId);
if (!plan.getFatherPlanId().equals(CommonEnum.FATHER_ID.getValue())) {
throw new BusinessException(CommonResponseEnum.FAIL, "该计划非主计划");
}
Integer planCode = plan.getCode();
// 获取所有子计划
List<AdPlan> subPlanList = this.lambdaQuery().eq(AdPlan::getFatherPlanId, planId).list();
if (CollUtil.isEmpty(subPlanList)) {
throw new BusinessException(CommonResponseEnum.FAIL, "该计划未找到子计划");
}
// 合并前清除相关表数据
String mainHarmonicTableName = "ad_harmonic_" + planCode;
String mainNonHarmonicTableName = "ad_non_harmonic_" + planCode;
String mainHarmonicResultTableName = "ad_harmonic_result_" + planCode;
String mainNonHarmonicResultTableName = "ad_non_harmonic_result_" + planCode;
jdbcTemplate.update("DELETE FROM " + mainHarmonicTableName);
jdbcTemplate.update("DELETE FROM " + mainNonHarmonicTableName);
jdbcTemplate.update("DELETE FROM " + mainHarmonicResultTableName);
jdbcTemplate.update("DELETE FROM " + mainNonHarmonicResultTableName);
// 获取所有子计划检测数据
List<Integer> subPlanCodeList = subPlanList.stream().map(AdPlan::getCode).collect(Collectors.toList());
List<String> harmonicTableNames = new ArrayList<>();
List<String> nonHarmonicTableNames = new ArrayList<>();
List<String> harmonicResultTableNames = new ArrayList<>();
List<String> nonHarmonicResultTableNames = new ArrayList<>();
for (Integer code : subPlanCodeList) {
harmonicTableNames.add("ad_harmonic_" + code);
nonHarmonicTableNames.add("ad_non_harmonic_" + code);
harmonicResultTableNames.add("ad_harmonic_result_" + code);
nonHarmonicResultTableNames.add("ad_non_harmonic_result_" + code);
}
// 将子计划的谐波数据插入到主计划的谐波表中
for (String harmonicTableName : harmonicTableNames) {
String sql = "INSERT INTO " + mainHarmonicTableName + " SELECT * FROM " + harmonicTableName;
jdbcTemplate.update(sql);
}
// 将子计划的非谐波数据插入到主计划的非谐波表中
for (String nonHarmonicTableName : nonHarmonicTableNames) {
String sql = "INSERT INTO " + mainNonHarmonicTableName + " SELECT * FROM " + nonHarmonicTableName;
jdbcTemplate.update(sql);
}
// 将子计划的谐波结果数据插入到主计划的谐波结果表中
for (String harmonicResultTableName : harmonicResultTableNames) {
String sql = "INSERT INTO " + mainHarmonicResultTableName + " SELECT * FROM " + harmonicResultTableName;
jdbcTemplate.update(sql);
}
// 将子计划的非谐波结果数据插入到主计划的非谐波结果表中
for (String nonHarmonicResultTableName : nonHarmonicResultTableNames) {
String sql = "INSERT INTO " + mainNonHarmonicResultTableName + " SELECT * FROM " + nonHarmonicResultTableName;
jdbcTemplate.update(sql);
}
return true;
}
} }

View File

@@ -0,0 +1,117 @@
package com.njcn.gather.plan.service.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* 分批读取文件行的工具类
* 用于处理大文件,避免一次性加载整个文件到内存中导致内存溢出
*/
public class BatchFileReader {
private static final Logger log = LoggerFactory.getLogger(BatchFileReader.class);
/**
* 默认批次大小
*/
private static final int DEFAULT_BATCH_SIZE = 10000;
/**
* 分批读取文件行
*
* @param file 要读取的文件
* @param batchSize 批次大小
* @param consumer 处理每批数据的消费者函数
*/
public static void readLinesInBatches(File file, int batchSize, Consumer<List<String>> consumer) {
readLinesInBatches(file, batchSize, Charset.defaultCharset(), consumer);
}
/**
* 分批读取文件行
*
* @param file 要读取的文件
* @param batchSize 批次大小
* @param charset 文件编码
* @param consumer 处理每批数据的消费者函数
*/
public static void readLinesInBatches(File file, int batchSize, Charset charset, Consumer<List<String>> consumer) {
if (batchSize <= 0) {
batchSize = DEFAULT_BATCH_SIZE;
}
if (!FileUtil.exist(file)) {
log.warn("文件不存在: {}", file.getAbsolutePath());
return;
}
try (BufferedReader reader = IoUtil.getReader(new InputStreamReader(FileUtil.getInputStream(file), charset))) {
List<String> batchLines = new ArrayList<>(batchSize);
String line;
int count = 0;
while ((line = reader.readLine()) != null) {
batchLines.add(line);
count++;
// 当达到批次大小时,处理这一批数据
if (count % batchSize == 0) {
if (CollUtil.isNotEmpty(batchLines)) {
consumer.accept(batchLines);
batchLines = new ArrayList<>(batchSize);
}
}
}
// 处理最后一批数据(不足一批的数据)
if (CollUtil.isNotEmpty(batchLines)) {
consumer.accept(batchLines);
}
} catch (IOException e) {
log.error("读取文件失败: {}", file.getAbsolutePath(), e);
throw new RuntimeException("读取文件失败: " + file.getAbsolutePath(), e);
}
}
/**
* 分批读取文件行(使用默认批次大小)
*
* @param file 要读取的文件
* @param consumer 处理每批数据的消费者函数
*/
public static void readLinesInBatches(File file, Consumer<List<String>> consumer) {
readLinesInBatches(file, DEFAULT_BATCH_SIZE, consumer);
}
/**
* 分批读取文件行(使用默认编码)
*
* @param filePath 文件路径
* @param batchSize 批次大小
* @param consumer 处理每批数据的消费者函数
*/
public static void readLinesInBatches(String filePath, int batchSize, Consumer<List<String>> consumer) {
readLinesInBatches(FileUtil.file(filePath), batchSize, consumer);
}
/**
* 分批读取文件行(使用默认编码和批次大小)
*
* @param filePath 文件路径
* @param consumer 处理每批数据的消费者函数
*/
public static void readLinesInBatches(String filePath, Consumer<List<String>> consumer) {
readLinesInBatches(FileUtil.file(filePath), DEFAULT_BATCH_SIZE, consumer);
}
}

View File

@@ -0,0 +1,65 @@
package com.njcn.gather.system.config.handler;
import cn.hutool.core.util.StrUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.db.mybatisplus.handler.AutoFillValueHandler;
import com.njcn.web.utils.RequestUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import java.util.function.Supplier;
@Primary
@Component
@Slf4j
public class NonWebAutoFillValueHandler extends AutoFillValueHandler {
/**
* 当前用户ID的线程本地变量用于非Web环境
*/
private static final ThreadLocal<String> CURRENT_USER_ID = new ThreadLocal<>();
@Override
public Supplier<String> getUserIdSupplier() {
return () -> {
try {
// 首先尝试从Web环境获取用户ID
String userId = RequestUtil.getUserId();
String actualUserId = StrUtil.isBlank(userId) ? "未知用户" : userId;
return actualUserId;
} catch (BusinessException e) {
// 如果是"当前请求web环境为空"异常,则尝试从线程本地变量获取
if (e.getMessage().contains("当前请求web环境为空")) {
String userId = CURRENT_USER_ID.get();
if (userId != null) {
return userId;
}
// 如果线程本地变量中也没有用户ID则返回默认值
log.warn("无法获取当前用户ID");
return "未知用户";
}
// 其他异常直接抛出
throw e;
}
};
}
/**
* 在非Web环境中设置当前用户ID
*
* @param userId 用户ID
*/
public static void setCurrentUserId(String userId) {
CURRENT_USER_ID.set(userId);
}
/**
* 清除当前线程的用户ID设置
*/
public static void clearCurrentUserId() {
CURRENT_USER_ID.remove();
}
}