@@ -2,9 +2,12 @@ package com.njcn.gather.plan.service;
import cn.hutool.core.collection.CollUtil ;
import cn.hutool.core.io.FileUtil ;
import cn.hutool.core.util.ObjectUtil ;
import cn.hutool.core.util.StrUtil ;
import cn.hutool.core.util.ZipUtil ;
import cn.hutool.json.JSONConfig ;
import cn.hutool.json.JSONUtil ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper ;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper ;
import com.njcn.common.pojo.enums.response.CommonResponseEnum ;
import com.njcn.gather.detection.pojo.po.AdPair ;
@@ -13,6 +16,8 @@ import com.njcn.gather.device.pojo.po.PqDev;
import com.njcn.gather.device.pojo.po.PqDevSub ;
import com.njcn.gather.device.service.IPqDevService ;
import com.njcn.gather.device.service.IPqDevSubService ;
import com.njcn.gather.monitor.pojo.po.PqMonitor ;
import com.njcn.gather.monitor.service.IPqMonitorService ;
import com.njcn.gather.plan.pojo.po.AdPlan ;
import com.njcn.gather.plan.pojo.vo.AdPlanCheckDataVO ;
import com.njcn.gather.plan.service.util.BatchFileReader ;
@@ -33,9 +38,13 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile ;
import java.io.File ;
import java.time.Duration ;
import java.time.LocalDateTime ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.stream.Collectors ;
@Slf4j
@EnableAsync
@@ -48,20 +57,185 @@ public class AsyncPlanHandler {
private final IPqDevService pqDevService ;
private final IDevTypeService devTypeService ;
private final IPqDevSubService pqDevSubService ;
private final IPqMonitorService pqMonitorService ;
private final IAdPariService adPairService ;
private final JdbcTemplate jdbcTemplate ;
@Value ( " ${report.reportDir} " )
private String reportPath ;
@Value ( " ${data.homeDir} " )
private String dataPath ;
private static final int BATCH_SIZE = 10000 ;
private static final int FINAL_STEP = 85 ;
private static final String TEST_DATA_DIR = " plan_test_data " ;
@Async
public void exportPlanCheckDataZip ( String uid , String planId , List < PqDev > devList , Integer report ) {
NonWebAutoFillValueHandler . setCurrentUserId ( uid ) ;
LocalDateTime startTime = LocalDateTime . now ( ) ;
AdPlanCheckDataVO planCheckDataVO = new AdPlanCheckDataVO ( ) ;
AtomicInteger progress = new AtomicInteger ( 0 ) ;
AtomicInteger currentProgress = new AtomicInteger ( 0 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始导出文件 " ) ) ;
// 获取检测计划基本数据
AdPlan plan = adPlanService . getById ( planId ) ;
planCheckDataVO . setPlan ( plan ) ;
planCheckDataVO . setDevList ( devList ) ;
List < String > devIdList = devList . stream ( ) . map ( PqDev : : getId ) . collect ( Collectors . toList ( ) ) ;
// 被检设备状态统计
List < PqDevSub > devSubList = pqDevSubService . list ( new LambdaQueryWrapper < PqDevSub > ( ) . in ( PqDevSub : : getDevId , devIdList ) ) ;
planCheckDataVO . setDevSubList ( devSubList ) ;
// 被检设备监测点信息
List < PqMonitor > monitorList = pqMonitorService . list ( new LambdaQueryWrapper < PqMonitor > ( ) . in ( PqMonitor : : getDevId , devIdList ) ) ;
planCheckDataVO . setMonitorList ( monitorList ) ;
// devMonitorId = 被检设备ID+通道号
List < String > devMonitorIds = new ArrayList < > ( ) ;
for ( PqDev dev : devList ) {
List < String > channelNoList = StrUtil . split ( dev . getInspectChannel ( ) , StrUtil . COMMA ) ;
for ( String channelNo : channelNoList ) {
devMonitorIds . add ( dev . getId ( ) + StrUtil . UNDERLINE + channelNo ) ;
}
}
planCheckDataVO . setDevMonitorIds ( devMonitorIds ) ;
// 设备通道匹对关系
List < AdPair > pairList = adPairService . list ( new LambdaQueryWrapper < AdPair > ( ) . eq ( AdPair : : getPlanId , planId ) . in ( AdPair : : getDevMonitorId , devMonitorIds ) ) ;
planCheckDataVO . setPairList ( pairList ) ;
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 生成检测计划基本信息数据文件中,请耐心等待... " ) ) ;
// 获取计划检测结果数据表以及数据
Integer code = plan . getCode ( ) ;
List < String > dataTableNames = CollUtil . newArrayList ( " ad_harmonic_ " + code , " ad_non_harmonic_ " + code , " ad_harmonic_result_ " + code , " ad_non_harmonic_result_ " + code ) ;
// 创建临时目录用于存储文件
File tempDataDir = FileUtil . mkdir ( FileUtil . getTmpDirPath ( ) + " plan_test_data_ " + System . currentTimeMillis ( ) + " / " ) ;
int dataBatch = 0 ;
int current = 0 ;
if ( CollUtil . isNotEmpty ( pairList ) ) {
for ( String dataTableName : dataTableNames ) {
// 创建数据文件
String fileName = dataTableName . replace ( " _ " + code , " " ) + " .txt " ;
File dataFile = FileUtil . file ( tempDataDir , fileName ) ;
// 确保文件存在
FileUtil . touch ( dataFile ) ;
// 初始化写入标志,用于判断是否已写入字段名
boolean isFirstWrite = true ;
// 分页查询,避免一次性加载大量数据
int pageSize = BATCH_SIZE ; // 每页查询10000条记录
int offset = 0 ;
List < Map < String , Object > > pageData ;
do {
dataBatch + = 1 ;
if ( current < FINAL_STEP + 5 ) {
current = Math . min ( current + 1 , FINAL_STEP + 5 ) ;
}
currentProgress . set ( current ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress . get ( ) + currentProgress . get ( ) , " 生成检测结果数据文件中,请耐心等待... " ) ) ;
String paginatedSql = buildPaginatedQuery ( dataTableName , devMonitorIds , pageSize , offset ) ;
pageData = jdbcTemplate . queryForList ( paginatedSql ) ;
// 将当前页数据追加到文件中
if ( CollUtil . isNotEmpty ( pageData ) ) {
StringBuilder content = new StringBuilder ( ) ;
// 如果是第一次写入,先写入字段名
if ( isFirstWrite ) {
// 获取字段名
Map < String , Object > firstRow = pageData . get ( 0 ) ;
List < String > fieldNames = new ArrayList < > ( firstRow . keySet ( ) ) ;
// 写入字段名作为第一行
content . append ( StrUtil . join ( " \ t " , fieldNames ) ) . append ( System . lineSeparator ( ) ) ;
isFirstWrite = false ;
}
// 写入数据行
for ( Map < String , Object > data : pageData ) {
List < Object > values = new ArrayList < > ( data . values ( ) ) ;
content . append ( StrUtil . join ( " \ t " , values ) ) . append ( System . lineSeparator ( ) ) ;
}
// 追加内容到文件
FileUtil . appendUtf8String ( content . toString ( ) , dataFile ) ;
}
offset + = pageSize ;
} while ( pageData . size ( ) = = pageSize ) ; // 如果查询结果少于pageSize, 说明已经查询完所有数据
}
}
planCheckDataVO . setDataBatch ( dataBatch ) ;
int currentVal = progress . get ( ) + currentProgress . get ( ) ;
if ( currentVal < FINAL_STEP + 5 ) {
progress . addAndGet ( FINAL_STEP + 5 ) ;
} else {
progress . set ( currentVal ) ;
}
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 压缩检测结果数据文件中,请耐心等待... " ) ) ;
// 导出数据.zip文件
String jsonStr = JSONUtil . toJsonStr ( planCheckDataVO , new JSONConfig ( ) . setIgnoreNullValue ( false ) ) ;
try {
// 创建 JSON 文件
String jsonFileName = plan . getName ( ) + " .json " ;
File jsonFile = FileUtil . file ( tempDataDir , jsonFileName ) ;
FileUtil . writeUtf8String ( jsonStr , jsonFile ) ;
// 创建 ZIP 文件
String zipFileName = plan . getName ( ) + " _检测数据包.zip " ;
File zipFile = FileUtil . file ( dataPath + File . separator + TEST_DATA_DIR + File . separator , zipFileName ) ;
// 添加检测报告文件
if ( ObjectUtil . isNotNull ( report ) & & report . equals ( 1 ) ) {
for ( PqDev dev : devList ) {
DevType devType = devTypeService . getById ( dev . getDevType ( ) ) ;
String dirPath = reportPath . concat ( File . separator ) . concat ( devType . getName ( ) ) ;
File reportFile = new File ( dirPath . concat ( File . separator ) . concat ( dev . getCreateId ( ) ) . concat ( ReportConstant . DOCX ) ) ;
// 如果reportFile存在, 则将reportFile中的文件添加到已有的zip文件中
if ( FileUtil . exist ( reportFile ) ) {
// 复制reportFile到临时目录
FileUtil . copy ( reportFile , tempDataDir , true ) ;
}
}
}
// 创建zip文件, 包含所有文件
ZipUtil . zip ( tempDataDir . getAbsolutePath ( ) , zipFile . getAbsolutePath ( ) ) ;
// 删除临时目录
FileUtil . del ( tempDataDir ) ;
LocalDateTime endTime = LocalDateTime . now ( ) ;
log . info ( " 生成数据包完成,耗时: {}s " , Duration . between ( startTime , endTime ) . getSeconds ( ) ) ;
progress . set ( 100 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , zipFile . getAbsolutePath ( ) ) ) ;
} catch ( Exception e ) {
log . error ( " 生成数据包失败 " , e ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . FAIL . getCode ( ) , progress . get ( ) + currentProgress . get ( ) , " 生成数据包失败 " ) ) ;
} finally {
NonWebAutoFillValueHandler . clearCurrentUserId ( ) ;
}
sseClient . closeSse ( uid ) ;
}
@Transactional
@Async
public void importAndMergePlanCheckData ( MultipartFile file , String uid , String planId ) {
NonWebAutoFillValueHandler . setCurrentUserId ( uid ) ;
AtomicInteger progress = new AtomicInteger ( ) ;
LocalDateTime startTime = LocalDateTime . now ( ) ;
AtomicInteger progress = new AtomicInteger ( 0 ) ;
AtomicInteger currentProgress = new AtomicInteger ( 0 ) ;
AtomicInteger dataCount = new AtomicInteger ( 0 ) ;
try {
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始保存文件 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始保存文件,请耐心等待... " ) ) ;
// 创建临时目录用于解压文件
File tempDir = FileUtil . mkdir ( FileUtil . getTmpDirPath ( ) + " import_plan_check_data_ " + System . currentTimeMillis ( ) + " / " ) ;
@@ -69,7 +243,7 @@ public class AsyncPlanHandler {
File zipFile = FileUtil . file ( tempDir , file . getOriginalFilename ( ) ) ;
file . transferTo ( zipFile ) ;
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始解压文件 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始解压文件,请耐心等待... " ) ) ;
// 解压zip文件
File unzipDir = FileUtil . mkdir ( FileUtil . file ( tempDir , " unzip " ) ) ;
@@ -123,17 +297,17 @@ public class AsyncPlanHandler {
}
if ( ! StrUtil . equals ( planId , subPlan . getFatherPlanId ( ) ) ) {
FileUtil . del ( tempDir ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . FAIL . getCode ( ) , progress , " 该 当前检修计划的子计划" ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . FAIL . getCode ( ) , progress , " 非 当前检修计划的子计划" ) ) ;
return ;
}
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步计划基本信息 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步计划基本信息,请耐心等待... " ) ) ;
// 更新检测计划信息
checkPlan . setFatherPlanId ( subPlan . getFatherPlanId ( ) ) ;
checkPlan . setImportFlag ( 0 ) ;
adPlanService . updateById ( checkPlan ) ;
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步计划设备信息 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步计划设备信息,请耐心等待... " ) ) ;
// 批量更新被检设备信息
// 不更新导入标志
@@ -145,7 +319,7 @@ public class AsyncPlanHandler {
pqDevSubService . update ( devSub , new LambdaUpdateWrapper < PqDevSub > ( ) . eq ( PqDevSub : : getDevId , devSub . getDevId ( ) ) ) ;
}
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步通道配对信息 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步通道配对信息,请耐心等待... " ) ) ;
// 同步检测数据
List < AdPair > pairList = planCheckDataVO . getPairList ( ) ;
@@ -173,7 +347,7 @@ public class AsyncPlanHandler {
AdPlan plan = adPlanService . getById ( planId ) ;
Integer planCode = plan . getCode ( ) ;
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步检测数据信息 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始同步检测数据信息,请耐心等待... " ) ) ;
// 合并前清除相关表数据
String mainHarmonicTableName = " ad_harmonic_ " + planCode ;
String mainNonHarmonicTableName = " ad_non_harmonic_ " + planCode ;
@@ -195,7 +369,8 @@ public class AsyncPlanHandler {
jdbcTemplate . update ( " DELETE FROM " + mainNonHarmonicResultTableName + " WHERE dev_monitor_id IN ( " + devMonitorIdsStr + " ) " ) ;
}
int dataBatch = planCheckDataVO . getDataBatch ( ) ;
int step = 80 / ( dataBatch + 1 ) ;
int stepCount = dataBatch * BATCH_SIZE / FINAL_STEP ;
for ( File dataFile : dataFiles ) {
// 直接插入主计划表中
String fileName = FileUtil . mainName ( dataFile ) ;
@@ -206,9 +381,18 @@ public class AsyncPlanHandler {
final boolean [ ] isFirstBatch = { true } ;
final String [ ] headers = { null } ;
BatchFileReader . readLinesInBatches ( dataFile , 10000 , lines - > {
progress . addAndGet ( step ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 同步检测数据信息中 " ) ) ;
BatchFileReader . readLinesInBatches ( dataFile , BATCH_SIZE , lines - > {
dataCount . addAndGet ( lines . size ( ) ) ;
// 计算当前进度
int current = dataCount . get ( ) / stepCount ;
// 确保进度不超过finalStep
if ( current > FINAL_STEP ) {
current = FINAL_STEP ;
}
currentProgress . set ( current ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress . get ( ) + currentProgress . get ( ) , " 同步检测数据信息中,请耐心等待... " ) ) ;
if ( CollUtil . isNotEmpty ( lines ) ) {
if ( isFirstBatch [ 0 ] ) {
@@ -229,30 +413,47 @@ public class AsyncPlanHandler {
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 表 " + tableName + " 数据同步完成 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress . get ( ) + currentProgress . get ( ) , " 表 " + tableName + " 数据同步完成 " ) ) ;
}
}
progress . addAndGet ( 1 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 开始合并检测数据信息 " ) ) ;
// 删除临时目录
FileUtil . del ( tempDir ) ;
LocalDateTime endTime = LocalDateTime . now ( ) ;
log . info ( " 数据合并完成,耗时:{}s " , Duration . between ( startTime , endTime ) . getSeconds ( ) ) ;
progress . set ( 100 ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . SUCCESS . getCode ( ) , progress , " 数据合并完成 " ) ) ;
} catch ( Exception e ) {
log . error ( " 导入数据失败 " , e ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . FAIL . getCode ( ) , progress , " 导入失败 " ) ) ;
sseClient . sendMessage ( uid , planId , HttpResultUtil . assembleResult ( CommonResponseEnum . FAIL . getCode ( ) , progress . get ( ) + currentProgress . get ( ) , " 导入失败 " ) ) ;
} finally {
NonWebAutoFillValueHandler . clearCurrentUserId ( ) ;
}
sseClient . closeSse ( uid ) ;
}
// 构建分页查询SQL
private String buildPaginatedQuery ( String tableName , List < String > devMonitorIds , int limit , int offset ) {
StringBuilder sql = new StringBuilder ( " SELECT * FROM " + tableName ) ;
sql . append ( " WHERE Dev_Monitor_Id IN ( " ) ;
for ( int i = 0 ; i < devMonitorIds . size ( ) ; i + + ) {
sql . append ( " ' " ) . append ( devMonitorIds . get ( i ) ) . append ( " ' " ) ;
if ( i < devMonitorIds . size ( ) - 1 ) {
sql . append ( " , " ) ;
}
}
sql . append ( " ) " ) ;
sql . append ( " ORDER BY Id " ) ;
// 添加分页限制
sql . append ( " LIMIT " ) . append ( limit ) . append ( " OFFSET " ) . append ( offset ) ;
return sql . toString ( ) ;
}
/**
* 处理数据行