From a4b29c6b76bb30807b9b4cc6871c823ddbe6a11a Mon Sep 17 00:00:00 2001
From: hzj <826100833@qq.com>
Date: Sat, 11 Oct 2025 10:16:47 +0800
Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E7=89=88=E6=9C=AC=E6=8F=90?=
=?UTF-8?q?=E4=BA=A4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.gitignore | 38 ++++++++
pom.xml | 92 +++++++++++++++++++
.../njcn/kafkadata/KafkaDataApplication.java | 25 +++++
.../controller/HarmonicDataController.java | 26 ++++++
.../controller/KafkaTestController.java | 28 ++++++
.../com/njcn/kafkadata/job/PushKafkaJob.java | 38 ++++++++
.../pojo/dto/HarmonicMessageDTO.java | 17 ++++
.../njcn/kafkadata/pojo/param/PushParam.java | 34 +++++++
.../njcn/kafkadata/pojo/param/TableEnum.java | 73 +++++++++++++++
.../service/HarmonicDataService.java | 14 +++
.../service/KafkaProducerService.java | 15 +++
.../service/impl/HarmonicDataServiceImpl.java | 76 +++++++++++++++
.../service/impl/KafkaConsumerService.java | 25 +++++
.../impl/KafkaProducerServiceImpl.java | 50 ++++++++++
.../kafkadata/util/TimestampFileUtil.java | 68 ++++++++++++++
src/main/resources/application.yml | 38 ++++++++
16 files changed, 657 insertions(+)
create mode 100644 .gitignore
create mode 100644 pom.xml
create mode 100644 src/main/java/com/njcn/kafkadata/KafkaDataApplication.java
create mode 100644 src/main/java/com/njcn/kafkadata/controller/HarmonicDataController.java
create mode 100644 src/main/java/com/njcn/kafkadata/controller/KafkaTestController.java
create mode 100644 src/main/java/com/njcn/kafkadata/job/PushKafkaJob.java
create mode 100644 src/main/java/com/njcn/kafkadata/pojo/dto/HarmonicMessageDTO.java
create mode 100644 src/main/java/com/njcn/kafkadata/pojo/param/PushParam.java
create mode 100644 src/main/java/com/njcn/kafkadata/pojo/param/TableEnum.java
create mode 100644 src/main/java/com/njcn/kafkadata/service/HarmonicDataService.java
create mode 100644 src/main/java/com/njcn/kafkadata/service/KafkaProducerService.java
create mode 100644 src/main/java/com/njcn/kafkadata/service/impl/HarmonicDataServiceImpl.java
create mode 100644 src/main/java/com/njcn/kafkadata/service/impl/KafkaConsumerService.java
create mode 100644 src/main/java/com/njcn/kafkadata/service/impl/KafkaProducerServiceImpl.java
create mode 100644 src/main/java/com/njcn/kafkadata/util/TimestampFileUtil.java
create mode 100644 src/main/resources/application.yml
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..5ff6309
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..6d5df4b
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,92 @@
+
+
+ 4.0.0
+
+ com.njcn
+ kafka-harmonic-data
+ 1.0-SNAPSHOT
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.3.12.RELEASE
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.3.12.RELEASE
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ 2.5.14.RELEASE
+
+
+ org.projectlombok
+ lombok
+ 1.18.18
+
+
+
+ com.njcn
+ pqs-influx
+ 1.0.0
+
+
+
+
+
+
+
+ kafka-harmonic-data
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ package
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 1.8
+ 1.8
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 2.2.1
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/njcn/kafkadata/KafkaDataApplication.java b/src/main/java/com/njcn/kafkadata/KafkaDataApplication.java
new file mode 100644
index 0000000..a8e1600
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/KafkaDataApplication.java
@@ -0,0 +1,25 @@
+package com.njcn.kafkadata;
+
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
+import org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration;
+import org.springframework.context.annotation.DependsOn;
+
+/**
+ * pqs
+ *
+ * @author cdf
+ * @date 2022/11/10
+ */
+
+@SpringBootApplication
+@DependsOn("proxyMapperRegister")
+public class KafkaDataApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaDataApplication.class, args);
+ }
+
+}
diff --git a/src/main/java/com/njcn/kafkadata/controller/HarmonicDataController.java b/src/main/java/com/njcn/kafkadata/controller/HarmonicDataController.java
new file mode 100644
index 0000000..65983b2
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/controller/HarmonicDataController.java
@@ -0,0 +1,26 @@
+package com.njcn.kafkadata.controller;
+
+import com.njcn.kafkadata.pojo.param.PushParam;
+import com.njcn.kafkadata.service.HarmonicDataService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * Description:
+ * Date: 2025/05/27 下午 2:56【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Slf4j
+@RestController
+@RequestMapping("/harmonicdata")
+@AllArgsConstructor
+public class HarmonicDataController {
+ private final HarmonicDataService harmonicDataService;
+ @PostMapping("/push")
+ public void push(@RequestBody PushParam pushParam) throws Exception {
+ harmonicDataService.push(pushParam);
+ }
+}
diff --git a/src/main/java/com/njcn/kafkadata/controller/KafkaTestController.java b/src/main/java/com/njcn/kafkadata/controller/KafkaTestController.java
new file mode 100644
index 0000000..e444ba5
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/controller/KafkaTestController.java
@@ -0,0 +1,28 @@
+package com.njcn.kafkadata.controller;
+
+import com.njcn.kafkadata.service.KafkaProducerService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * Description:
+ * Date: 2025/05/27 下午 2:06【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+
+@Slf4j
+@RestController
+@RequestMapping("/test")
+@AllArgsConstructor
+public class KafkaTestController {
+ private final KafkaProducerService kafkaProducerService;
+
+ @GetMapping("/testkafka")
+
+ public void test(@RequestParam("context") String context) {
+ kafkaProducerService.sendMessageWithCallback("harmonic-data",context);
+ }
+}
diff --git a/src/main/java/com/njcn/kafkadata/job/PushKafkaJob.java b/src/main/java/com/njcn/kafkadata/job/PushKafkaJob.java
new file mode 100644
index 0000000..ec5b747
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/job/PushKafkaJob.java
@@ -0,0 +1,38 @@
+package com.njcn.kafkadata.job;
+
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.LocalDateTimeUtil;
+import com.njcn.kafkadata.pojo.param.PushParam;
+import com.njcn.kafkadata.service.HarmonicDataService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+/**
+ * Description:
+ * Date: 2025/05/28 下午 2:34【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Component
+@EnableScheduling
+@RequiredArgsConstructor
+@Slf4j
+public class PushKafkaJob {
+ private final HarmonicDataService harmonicDataService;
+
+ @Scheduled(cron="0 0/5 * * * ?")
+ public void pqOnlinerate() throws Exception {
+ PushParam pushParam = new PushParam();
+ // 获取当前时间
+ LocalDateTime now = LocalDateTime.now();
+ pushParam.setBeginTime(LocalDateTimeUtil.format(now.minusMinutes(10), DatePattern.NORM_DATETIME_PATTERN));
+ pushParam.setEndTime(LocalDateTimeUtil.format(now.minusMinutes(5), DatePattern.NORM_DATETIME_PATTERN));
+ harmonicDataService.push(pushParam);
+ }
+}
diff --git a/src/main/java/com/njcn/kafkadata/pojo/dto/HarmonicMessageDTO.java b/src/main/java/com/njcn/kafkadata/pojo/dto/HarmonicMessageDTO.java
new file mode 100644
index 0000000..31a46d0
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/pojo/dto/HarmonicMessageDTO.java
@@ -0,0 +1,17 @@
+package com.njcn.kafkadata.pojo.dto;
+
+import lombok.Data;
+
+/**
+ * Description:
+ * Date: 2025/05/27 下午 3:32【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Data
+public class HarmonicMessageDTO {
+ private String tableName;
+ private String dataJson;
+
+}
diff --git a/src/main/java/com/njcn/kafkadata/pojo/param/PushParam.java b/src/main/java/com/njcn/kafkadata/pojo/param/PushParam.java
new file mode 100644
index 0000000..3d2de8c
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/pojo/param/PushParam.java
@@ -0,0 +1,34 @@
+package com.njcn.kafkadata.pojo.param;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Description:
+ * Date: 2025/05/28 上午 8:47【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Data
+public class PushParam {
+
+ //补招起始日期_yyyy-MM-dd(按小时跑的任务可加时分秒
+ private String beginTime;
+
+ //补招截止日期_yyyy-MM-dd(按小时跑的任务可加时分秒)
+ private String endTime;
+
+ //时间日期_yyyy-MM-dd(按小时跑的任务可加时分秒)
+ private String dataDate;
+
+
+
+ /**
+ * 待计算的对象索引集合,监测点、设备、母线、变电站、单位等等
+ */
+ private List idList;
+
+}
diff --git a/src/main/java/com/njcn/kafkadata/pojo/param/TableEnum.java b/src/main/java/com/njcn/kafkadata/pojo/param/TableEnum.java
new file mode 100644
index 0000000..2eba3e7
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/pojo/param/TableEnum.java
@@ -0,0 +1,73 @@
+package com.njcn.kafkadata.pojo.param;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Description:
+ * Date: 2024/1/4 13:54【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Getter
+public enum TableEnum {
+ DATAFLICKER("DataFlicker","电压闪变数据表", "data_flicker"),
+ DATAFLUC("DataFluc","电压波动数据表", "data_fluc"),
+ DATAHARMPHASICI("DataHarmPhasicI","谐波电流角度数据表", "data_harmphasic_i"),
+ DATAHARMPHASICV("DataHarmPhasicV","谐波电压角度数据表", "data_harmphasic_v"),
+ DATAHARMPOWERP("DataHarmPowerP","有功功率数据表", "data_harmpower_p"),
+ DATAHARMPOWERQ("DataHarmPowerQ","无功功率数据表", "data_harmpower_q"),
+ DATAHARMPOWERS("DataHarmPowerS","视在功率数据表", "data_harmpower_s"),
+ DATAHARMRATEI("DataHarmRateI","谐波电流含有率数据表", "data_harmrate_i"),
+ DATAHARMRATEV("DataHarmRateV","谐波电压含有率数据表", "data_harmrate_v"),
+ DATAINHARMI("DataInHarmI","电流简谐波幅值数据表", "data_inharm_i"),
+ DATAINHARMV("DataInHarmV","电压间谐波幅值数据表", "data_inharm_v"),
+ DATAI("DataI","谐波电流幅值数据表", "data_i"),
+ DATAPLT("DataPlt","长时闪变数据表", "data_plt"),
+ DATAV("DataV","谐波电压幅值数据表", "data_v"),
+ ;
+
+
+ private final String code;
+ private final String value;
+
+ private final String tableName;
+
+ TableEnum(String code, String value, String tableName) {
+ this.code = code;
+ this.value = value;
+ this.tableName = tableName;
+ }
+
+ /**
+ * 获取所有可执行的组件类型,
+ * 除了"目录"类型,其他都可以执行
+ * @return
+ */
+ public static List getExecutableTypes() {
+ return Arrays.stream(TableEnum.values()).map(tmep->{
+ return tmep.code;
+ }).collect(Collectors.toList());
+ }
+
+
+ /**
+ * 通过code获取枚举值
+ * @param code
+ * @return
+ */
+ public static String getValueByCode(String code) {
+ for (TableEnum item : TableEnum.values()) {
+ if (item.code.equals(code)) {
+ return item.value;
+ }
+ }
+ return null;
+ }
+
+
+}
diff --git a/src/main/java/com/njcn/kafkadata/service/HarmonicDataService.java b/src/main/java/com/njcn/kafkadata/service/HarmonicDataService.java
new file mode 100644
index 0000000..8f0fa11
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/service/HarmonicDataService.java
@@ -0,0 +1,14 @@
+package com.njcn.kafkadata.service;
+
+import com.njcn.kafkadata.pojo.param.PushParam;
+
+/**
+ * Description:
+ * Date: 2025/05/28 上午 8:38【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+public interface HarmonicDataService {
+ void push(PushParam pushParam) throws Exception;
+}
diff --git a/src/main/java/com/njcn/kafkadata/service/KafkaProducerService.java b/src/main/java/com/njcn/kafkadata/service/KafkaProducerService.java
new file mode 100644
index 0000000..760f854
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/service/KafkaProducerService.java
@@ -0,0 +1,15 @@
+package com.njcn.kafkadata.service;
+
+/**
+ * Description:
+ * Date: 2025/05/27 下午 1:57【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+public interface KafkaProducerService {
+ void sendMessage(String topic, String message);
+
+ // 带回调的发送
+ void sendMessageWithCallback(String topic, String message);
+}
diff --git a/src/main/java/com/njcn/kafkadata/service/impl/HarmonicDataServiceImpl.java b/src/main/java/com/njcn/kafkadata/service/impl/HarmonicDataServiceImpl.java
new file mode 100644
index 0000000..4b08167
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/service/impl/HarmonicDataServiceImpl.java
@@ -0,0 +1,76 @@
+package com.njcn.kafkadata.service.impl;
+
+import cn.hutool.extra.spring.SpringUtil;
+import cn.hutool.json.JSONUtil;
+import com.njcn.influx.base.InfluxDbBaseMapper;
+import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
+import com.njcn.influx.query.InfluxQueryWrapper;
+import com.njcn.kafkadata.pojo.dto.HarmonicMessageDTO;
+import com.njcn.kafkadata.pojo.param.PushParam;
+import com.njcn.kafkadata.pojo.param.TableEnum;
+import com.njcn.kafkadata.service.HarmonicDataService;
+import com.njcn.kafkadata.service.KafkaProducerService;
+import com.njcn.kafkadata.util.TimestampFileUtil;
+import lombok.RequiredArgsConstructor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Description:
+ * Date: 2025/05/28 上午 8:38【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Service
+@RequiredArgsConstructor
+public class HarmonicDataServiceImpl implements HarmonicDataService {
+
+ private final static String PACKAGE_PREFIX = "com.njcn.influx.imapper.";
+ private final static String PACKAGE_SUFFIX = "Mapper";
+
+ private final static String TOPIC = "harmonic_data";
+ private final static String PACKAGE_PREFIX_BEAN ="com.njcn.influx.pojo.po." ;
+ private final KafkaProducerService kafkaProducerService;
+
+ @Value("${filePath}")
+ private String filePath;
+
+ @Override
+ public void push(PushParam pushParam) throws Exception {
+ String beginTime = pushParam.getBeginTime();
+ String endTime = pushParam.getEndTime();
+
+ String timestamp = TimestampFileUtil.getTimestamp(filePath);
+ if(!StringUtils.isEmpty(timestamp)){
+ if(timestamp.compareTo(endTime)<=0){
+ beginTime=timestamp;
+ }else {
+ throw new RuntimeException(pushParam.getBeginTime()+"-"+pushParam.getEndTime()+"时间段数据已进行过推送至Kafka目前已数据已推送至"+timestamp);
+ }
+ }
+ for (TableEnum value : TableEnum.values()) {
+ String className = value.getCode();
+ String tableName = value.getTableName();
+ //查询数据
+ InfluxDbBaseMapper mapper = (InfluxDbBaseMapper) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + className + PACKAGE_SUFFIX));
+ System.out.println(PACKAGE_PREFIX + className + PACKAGE_SUFFIX);
+ InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(Class.forName(PACKAGE_PREFIX_BEAN+className));
+ influxQueryWrapper
+ .between(InfluxDBTableConstant.TIME, beginTime, endTime);
+ List list1 = mapper.selectByQueryWrapper(influxQueryWrapper);
+ list1.forEach(temp->{
+ HarmonicMessageDTO harmonicMessageDTO = new HarmonicMessageDTO();
+ harmonicMessageDTO.setTableName(tableName);
+ harmonicMessageDTO.setDataJson(JSONUtil.toJsonStr(temp));
+ String jsonStr = JSONUtil.toJsonStr(harmonicMessageDTO);
+ kafkaProducerService.sendMessage(TOPIC,jsonStr);
+ });
+ }
+ TimestampFileUtil.updateTimestamp(endTime,filePath);
+ }
+}
diff --git a/src/main/java/com/njcn/kafkadata/service/impl/KafkaConsumerService.java b/src/main/java/com/njcn/kafkadata/service/impl/KafkaConsumerService.java
new file mode 100644
index 0000000..d5e81c9
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/service/impl/KafkaConsumerService.java
@@ -0,0 +1,25 @@
+//package com.njcn.kafkadata.service.impl;
+//
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.stereotype.Service;
+//
+//import java.util.List;
+//
+///**
+// * Description:
+// * Date: 2025/05/28 上午 11:13【需求编号】
+// *
+// * @author clam
+// * @version V1.0.0
+// */
+//@Service
+//public class KafkaConsumerService {
+//
+// @KafkaListener(topics = "my-topic", groupId = "my-group")
+// public void consume(String message) {
+// System.out.println("Consumed message: " + message);
+// }
+//
+// // 批量消费
+//
+//}
diff --git a/src/main/java/com/njcn/kafkadata/service/impl/KafkaProducerServiceImpl.java b/src/main/java/com/njcn/kafkadata/service/impl/KafkaProducerServiceImpl.java
new file mode 100644
index 0000000..4ee7d2d
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/service/impl/KafkaProducerServiceImpl.java
@@ -0,0 +1,50 @@
+package com.njcn.kafkadata.service.impl;
+
+import com.njcn.kafkadata.service.KafkaProducerService;
+import lombok.RequiredArgsConstructor;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+/**
+ * Description:
+ * Date: 2025/05/27 下午 2:00【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+@Service
+@RequiredArgsConstructor
+public class KafkaProducerServiceImpl implements KafkaProducerService {
+
+ private final KafkaTemplate kafkaTemplate;
+
+
+ @Override
+ public void sendMessage(String topic, String message) {
+ kafkaTemplate.send(topic, message);
+ }
+ @Override
+ // 带回调的发送
+ public void sendMessageWithCallback(String topic, String message) {
+ ListenableFuture> future = kafkaTemplate.send(topic, message);
+
+ future.addCallback(new ListenableFutureCallback>() {
+
+
+ @Override
+ public void onSuccess(SendResult result) {
+ System.out.println("Sent message=[" + message +
+ "] with offset=[" + result.getRecordMetadata().offset() + "]");
+ }
+
+ @Override
+ public void onFailure(Throwable ex) {
+ System.out.println("Unable to send message=[" +
+ message + "] due to : " + ex.getMessage());
+ }
+ });
+ }
+}
diff --git a/src/main/java/com/njcn/kafkadata/util/TimestampFileUtil.java b/src/main/java/com/njcn/kafkadata/util/TimestampFileUtil.java
new file mode 100644
index 0000000..53677bc
--- /dev/null
+++ b/src/main/java/com/njcn/kafkadata/util/TimestampFileUtil.java
@@ -0,0 +1,68 @@
+package com.njcn.kafkadata.util;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+
+import java.io.*;
+import java.net.URL;
+import java.nio.file.*;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * Description:
+ * Date: 2025/05/28 上午 11:30【需求编号】
+ *
+ * @author clam
+ * @version V1.0.0
+ */
+
+public class TimestampFileUtil {
+
+ private static final String FILENAME = "time.txt";
+
+ /**
+ * 更新时间戳文件
+ */
+ public static void updateTimestamp(String endTime,String resourcePath) throws IOException {
+ Path filePath = Paths.get(resourcePath, FILENAME);
+
+ // 如果文件不存在,创建它;存在则覆盖
+ Files.write(filePath, endTime.getBytes());
+ }
+
+ /**
+ * 获取当前时间戳
+ */
+ public static String getTimestamp(String resourcePath) throws IOException {
+ Path filePath = Paths.get(resourcePath, FILENAME);
+ if (!Files.exists(filePath)) {
+ return null;
+ }
+ return new String(Files.readAllBytes(filePath)).trim();
+ }
+
+
+
+
+ public static void main(String[] args) {
+ try {
+ // 获取当前时间戳(如果有)
+ String currentTimestamp = getTimestamp("D:");
+ if (currentTimestamp != null) {
+ System.out.println("当前文件中的时间: " + currentTimestamp);
+ } else {
+ System.out.println("时间戳文件不存在,将创建新文件");
+ }
+
+ // 更新时间戳
+ updateTimestamp("2020-01-02 00:00:00","D:");
+
+ // 打印新时间戳
+ System.out.println("更新时间戳为: " + getTimestamp("D:"));
+ } catch (IOException e) {
+ System.err.println("操作时间戳文件失败: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..a2d805d
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,38 @@
+server:
+ port: 10601
+ tomcat:
+ uri-encoding: UTF-8
+spring:
+ application:
+ name: kafka-harmonic-data
+ kafka:
+ # Kafka 服务器地址
+ bootstrap-servers: 10.121.17.201:9092,10.121.17.202:9092,10.121.17.203:9092
+
+ # 生产者配置
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ # 可选配置
+ acks: all
+ retries: 3
+ batch-size: 16384
+ buffer-memory: 33554432
+
+ # 消费者配置
+ consumer:
+ group-id: my-group
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ # 可选配置
+ enable-auto-commit: false
+ max-poll-records: 500
+ #influxDB内容配置
+ influx:
+ url: http://198.120.100.196:8086
+ user: pqsadmin
+ password: Pqsadmin123
+ database: PQSBASE
+ mapper-location: com.njcn.**.imapper
+filePath: /usr/kafkaproject