From 67bfba23ebddfee27f17e5c891db4e53052890c3 Mon Sep 17 00:00:00 2001 From: wr <1754607820@qq.com> Date: Tue, 17 Jun 2025 16:49:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BE=AE=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../MigrationInfluxDBController.java | 22 ++++ .../read/job/MigrationInfluxDBJob.java | 2 + .../service/impl/MigrationServiceImpl.java | 100 ++++++++++-------- .../njcn/migration/read/util/TimeUtil.java | 2 + 4 files changed, 79 insertions(+), 47 deletions(-) diff --git a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/controller/MigrationInfluxDBController.java b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/controller/MigrationInfluxDBController.java index 8238375..41db439 100644 --- a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/controller/MigrationInfluxDBController.java +++ b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/controller/MigrationInfluxDBController.java @@ -87,4 +87,26 @@ public class MigrationInfluxDBController { fileInputStream.close(); responseOutputStream.close(); } + + + @GetMapping(value = "/influxdbCs",produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) + @ApiOperation(value ="influxdb数据同步->天数按小时进行分组同步测试", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) + public void influxdbCs() { + System.out.println("--------------------------------influxdb同步------------------------------------"); + // 获取当前时间 + LocalDateTime now = LocalDateTime.now(); + // 减去2个小时 + LocalDateTime oneHourAgo = now.minusHours(2); + // 将分钟和秒设置为0 + LocalDateTime result = oneHourAgo.truncatedTo(ChronoUnit.HOURS); + // 加上59分钟59秒 + LocalDateTime modifiedResult = result.plusMinutes(59).plusSeconds(59); + + LineCountEvaluateParam param = new LineCountEvaluateParam(); + param.setIsManual(false); + param.setStartTime(result.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + param.setEndTime(modifiedResult.format(DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN))); + migrationService.hourseLineDataBacthSysc(param); + migrationService.hourseDevDataBacthSysc(param); + } } diff --git a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/job/MigrationInfluxDBJob.java b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/job/MigrationInfluxDBJob.java index fa5a00d..e468011 100644 --- a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/job/MigrationInfluxDBJob.java +++ b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/job/MigrationInfluxDBJob.java @@ -43,6 +43,7 @@ public class MigrationInfluxDBJob { private final RmpEventDetailMapper detailMapper; @Scheduled(cron = "0 2 * * * ?") public void InfluxDBJob() { + System.out.println("--------------------------------influxdb同步------------------------------------"); // 获取当前时间 LocalDateTime now = LocalDateTime.now(); // 减去2个小时 @@ -62,6 +63,7 @@ public class MigrationInfluxDBJob { @Scheduled(cron = "0 0 22 * * ?") public void mapJob() throws IOException { + System.out.println("--------------------------------excel文件同步------------------------------------"); File file = new File("/usr/local/jar/sj.xlsx"); List excelDataV = EasyExcel.read(file) .head(LineTimeDto.class) diff --git a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/service/impl/MigrationServiceImpl.java b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/service/impl/MigrationServiceImpl.java index acf8566..517e117 100644 --- a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/service/impl/MigrationServiceImpl.java +++ b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/service/impl/MigrationServiceImpl.java @@ -58,43 +58,47 @@ public class MigrationServiceImpl implements MigrationService { Map map = TimeUtil.getLineMap(); int size = map.size(); final Integer[] num = {0}; - map.forEach((lineId,time)->{ - num[0] = num[0] + 1; - System.out.println("当前总监测点数量"+size+"当前第"+num[0]+":-》"+lineId+" "+param.getStartTime()+" "+param.getEndTime()+"剩余"+(size-num[0])); - String format=null; - if(!param.getIsManual()){ - if(StrUtil.isNotBlank(time)){ - param.setStartTime(time); + map.forEach((lineId, time) -> { + num[0] = num[0] + 1; + LineCountEvaluateParam evaluateParam = new LineCountEvaluateParam(); + evaluateParam.setLineId(Arrays.asList(lineId)); + evaluateParam.setStartTime(param.getStartTime()); + evaluateParam.setEndTime(param.getEndTime()); + + if (!param.getIsManual()) { + if (StrUtil.isNotBlank(time)) { + evaluateParam.setStartTime(time); } } - param.setLineId(Arrays.asList(lineId)); - List dataVS = dataV.listDataV(param); - if(CollUtil.isNotEmpty(dataVS)){ - if(!param.getIsManual()){ + + String format = null; + List dataVS = dataV.listDataV(evaluateParam); + if (CollUtil.isNotEmpty(dataVS)) { + if (!param.getIsManual()) { format = dataVS.get(0).getTimeId(); } migrationInsertFeignClient.insertDataV(dataVS); } - migrationInsertFeignClient.insertDataFlicker(dataFlicker.listDataFlicker(param)); - migrationInsertFeignClient.insertDataFluc(dataFluc.listDataFluc(param)); - migrationInsertFeignClient.insertDataHarmphasicI(dataHarmphasicI.listDataHarmphasicI(param)); - migrationInsertFeignClient.insertDataHarmphasicV(dataHarmphasicV.listDataHarmphasicV(param)); - migrationInsertFeignClient.insertDataHarmpowerP(dataHarmpowerP.listDataHarmpowerP(param)); - migrationInsertFeignClient.insertDataHarmpowerQ(dataHarmpowerQ.listDataHarmpowerQ(param)); - migrationInsertFeignClient.insertDataHarmpowerS(dataHarmpowerS.listDataHarmpowerS(param)); - migrationInsertFeignClient.insertDataHarmrateI(dataHarmRateI.listDataHarmrateI(param)); - migrationInsertFeignClient.insertDataHarmrateV(dataHarmRateV.listDataHarmrateV(param)); - migrationInsertFeignClient.insertDataI(dataI.listDataI(param)); - migrationInsertFeignClient.insertDataInharmI(dataInharmI.listDataInharmI(param)); - migrationInsertFeignClient.insertDataInharmV(dataInharmV.listDataInharmV(param)); - migrationInsertFeignClient.insertDataPlt(dataPlt.listDataPlt(param)); - migrationInsertFeignClient.batchInsertion(eventDetail.getRawData(param)); + migrationInsertFeignClient.insertDataFlicker(dataFlicker.listDataFlicker(evaluateParam)); + migrationInsertFeignClient.insertDataFluc(dataFluc.listDataFluc(evaluateParam)); + migrationInsertFeignClient.insertDataHarmphasicI(dataHarmphasicI.listDataHarmphasicI(evaluateParam)); + migrationInsertFeignClient.insertDataHarmphasicV(dataHarmphasicV.listDataHarmphasicV(evaluateParam)); + migrationInsertFeignClient.insertDataHarmpowerP(dataHarmpowerP.listDataHarmpowerP(evaluateParam)); + migrationInsertFeignClient.insertDataHarmpowerQ(dataHarmpowerQ.listDataHarmpowerQ(evaluateParam)); + migrationInsertFeignClient.insertDataHarmpowerS(dataHarmpowerS.listDataHarmpowerS(evaluateParam)); + migrationInsertFeignClient.insertDataHarmrateI(dataHarmRateI.listDataHarmrateI(evaluateParam)); + migrationInsertFeignClient.insertDataHarmrateV(dataHarmRateV.listDataHarmrateV(evaluateParam)); + migrationInsertFeignClient.insertDataI(dataI.listDataI(evaluateParam)); + migrationInsertFeignClient.insertDataInharmI(dataInharmI.listDataInharmI(evaluateParam)); + migrationInsertFeignClient.insertDataInharmV(dataInharmV.listDataInharmV(evaluateParam)); + migrationInsertFeignClient.insertDataPlt(dataPlt.listDataPlt(evaluateParam)); + migrationInsertFeignClient.batchInsertion(eventDetail.getRawData(evaluateParam)); + System.out.println("定时任务_当前总监测点数量" + size + "当前第" + num[0] + ":-》" + lineId + " 文件时间 " + time + " |最新数据时间" + format + "| " + evaluateParam.getStartTime() + " " + evaluateParam.getEndTime() + "剩余" + (size - num[0])); - if(!param.getIsManual()&&StrUtil.isNotBlank(format)){ - TimeUtil.putLineTime(lineId,format); + if (!param.getIsManual() && StrUtil.isNotBlank(format)) { + TimeUtil.putLineTime(lineId, format); } }); - System.gc(); } @Override @@ -103,38 +107,40 @@ public class MigrationServiceImpl implements MigrationService { Map map = TimeUtil.getDevMap(); int size = map.size(); final Integer[] num = {0}; - map.forEach((lineId,time)->{ + map.forEach((lineId, time) -> { num[0] = num[0] + 1; - System.out.println("当前总终端数量"+size+"当前第"+num[0]+":-》"+lineId+" "+param.getStartTime()+" "+param.getEndTime()+"剩余"+(size-num[0])); - String format=null; - if(!param.getIsManual()){ - if(StrUtil.isNotBlank(time)){ + LineCountEvaluateParam evaluateParam = new LineCountEvaluateParam(); + evaluateParam.setLineId(Arrays.asList(lineId)); + evaluateParam.setStartTime(param.getStartTime()); + evaluateParam.setEndTime(param.getEndTime()); + + String format = null; + if (!param.getIsManual()) { + if (StrUtil.isNotBlank(time)) { param.setStartTime(time); } } - param.setLineId(Arrays.asList(lineId)); - List pqsCommunicates = pqsCommunicate.listPqsCommunicate(param); - if(CollUtil.isNotEmpty(pqsCommunicates)){ - if(!param.getIsManual()){ + List pqsCommunicates = pqsCommunicate.listPqsCommunicate(evaluateParam); + if (CollUtil.isNotEmpty(pqsCommunicates)) { + if (!param.getIsManual()) { format = pqsCommunicates.get(0).getTimeId(); } migrationInsertFeignClient.insertPqsCommunicate(pqsCommunicates); } - if(!param.getIsManual()&&StrUtil.isNotBlank(format)){ - TimeUtil.putDevTime(lineId,format); + System.out.println("当前总终端数量" + size + "当前第" + num[0] + ":-》" + lineId + " 文件时间 " + time + " |最新数据时间" + format + "| " + evaluateParam.getStartTime() + " " + evaluateParam.getEndTime() + "剩余" + (size - num[0])); + if (!param.getIsManual() && StrUtil.isNotBlank(format)) { + TimeUtil.putDevTime(lineId, format); } }); - System.gc(); } @Override - @Async("asyncInfluxDBExecutor") public void initializeExcel() { File file = new File("/usr/local/jar/sj.xlsx"); Map map = TimeUtil.getLineMap(); List lineExcel = new ArrayList<>(); - map.forEach((line,value)->{ - LineTimeDto data=new LineTimeDto(); + map.forEach((line, value) -> { + LineTimeDto data = new LineTimeDto(); data.setLineId(line); data.setTimeData(value); lineExcel.add(data); @@ -142,21 +148,21 @@ public class MigrationServiceImpl implements MigrationService { Map devmap = TimeUtil.getDevMap(); List devExcel = new ArrayList<>(); - devmap.forEach((line,value)->{ - DevTimeDto data=new DevTimeDto(); + devmap.forEach((line, value) -> { + DevTimeDto data = new DevTimeDto(); data.setDevId(line); data.setTimeData(value); devExcel.add(data); }); ExcelWriter excelWriter = EasyExcel.write(file).build(); //模板1 - WriteSheet writeSheet = EasyExcel.writerSheet(0, "line" ).head(LineTimeDto.class) + WriteSheet writeSheet = EasyExcel.writerSheet(0, "line").head(LineTimeDto.class) .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()) .build(); excelWriter.write(lineExcel, writeSheet); //模板2 - WriteSheet writeSheet2 = EasyExcel.writerSheet(1, "dev" ).head(DevTimeDto.class) + WriteSheet writeSheet2 = EasyExcel.writerSheet(1, "dev").head(DevTimeDto.class) .registerWriteHandler(new LongestMatchColumnWidthStyleStrategy()) .build(); excelWriter.write(devExcel, writeSheet2); diff --git a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/util/TimeUtil.java b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/util/TimeUtil.java index f9d89aa..9b6d929 100644 --- a/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/util/TimeUtil.java +++ b/migration-influxdb/migration-influxdb-read-boot/src/main/java/com/njcn/migration/read/util/TimeUtil.java @@ -132,5 +132,7 @@ public class TimeUtil { excelWriter.finish(); TimeUtil.putAllLineTime(excelDataV.stream().collect(Collectors.toMap(LineTimeDto::getLineId, LineTimeDto::getTimeData))); TimeUtil.putAllDevTime(excelCommunicates.stream().collect(Collectors.toMap(DevTimeDto::getDevId, DevTimeDto::getTimeData))); + System.out.println(" line "+TimeUtil.getLineMap()); + System.out.println(" dev "+TimeUtil.getDevMap()); } }