commit a4b29c6b76bb30807b9b4cc6871c823ddbe6a11a Author: hzj <826100833@qq.com> Date: Sat Oct 11 10:16:47 2025 +0800 初始版本提交 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..6d5df4b --- /dev/null +++ b/pom.xml @@ -0,0 +1,92 @@ + + + 4.0.0 + + com.njcn + kafka-harmonic-data + 1.0-SNAPSHOT + + org.springframework.boot + spring-boot-starter-parent + 2.3.12.RELEASE + + + 8 + 8 + UTF-8 + + + + + + + org.springframework.boot + spring-boot-starter-web + 2.3.12.RELEASE + + + + org.springframework.kafka + spring-kafka + 2.5.14.RELEASE + + + org.projectlombok + lombok + 1.18.18 + + + + com.njcn + pqs-influx + 1.0.0 + + + + + + + + kafka-harmonic-data + + + org.springframework.boot + spring-boot-maven-plugin + + + package + + repackage + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + true + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/njcn/kafkadata/KafkaDataApplication.java b/src/main/java/com/njcn/kafkadata/KafkaDataApplication.java new file mode 100644 index 0000000..a8e1600 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/KafkaDataApplication.java @@ -0,0 +1,25 @@ +package com.njcn.kafkadata; + + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; +import org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration; +import org.springframework.context.annotation.DependsOn; + +/** + * pqs + * + * @author cdf + * @date 2022/11/10 + */ + +@SpringBootApplication +@DependsOn("proxyMapperRegister") +public class KafkaDataApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaDataApplication.class, args); + } + +} diff --git a/src/main/java/com/njcn/kafkadata/controller/HarmonicDataController.java b/src/main/java/com/njcn/kafkadata/controller/HarmonicDataController.java new file mode 100644 index 0000000..65983b2 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/controller/HarmonicDataController.java @@ -0,0 +1,26 @@ +package com.njcn.kafkadata.controller; + +import com.njcn.kafkadata.pojo.param.PushParam; +import com.njcn.kafkadata.service.HarmonicDataService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +/** + * Description: + * Date: 2025/05/27 下午 2:56【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Slf4j +@RestController +@RequestMapping("/harmonicdata") +@AllArgsConstructor +public class HarmonicDataController { + private final HarmonicDataService harmonicDataService; + @PostMapping("/push") + public void push(@RequestBody PushParam pushParam) throws Exception { + harmonicDataService.push(pushParam); + } +} diff --git a/src/main/java/com/njcn/kafkadata/controller/KafkaTestController.java b/src/main/java/com/njcn/kafkadata/controller/KafkaTestController.java new file mode 100644 index 0000000..e444ba5 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/controller/KafkaTestController.java @@ -0,0 +1,28 @@ +package com.njcn.kafkadata.controller; + +import com.njcn.kafkadata.service.KafkaProducerService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +/** + * Description: + * Date: 2025/05/27 下午 2:06【需求编号】 + * + * @author clam + * @version V1.0.0 + */ + +@Slf4j +@RestController +@RequestMapping("/test") +@AllArgsConstructor +public class KafkaTestController { + private final KafkaProducerService kafkaProducerService; + + @GetMapping("/testkafka") + + public void test(@RequestParam("context") String context) { + kafkaProducerService.sendMessageWithCallback("harmonic-data",context); + } +} diff --git a/src/main/java/com/njcn/kafkadata/job/PushKafkaJob.java b/src/main/java/com/njcn/kafkadata/job/PushKafkaJob.java new file mode 100644 index 0000000..ec5b747 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/job/PushKafkaJob.java @@ -0,0 +1,38 @@ +package com.njcn.kafkadata.job; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.LocalDateTimeUtil; +import com.njcn.kafkadata.pojo.param.PushParam; +import com.njcn.kafkadata.service.HarmonicDataService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * Description: + * Date: 2025/05/28 下午 2:34【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Component +@EnableScheduling +@RequiredArgsConstructor +@Slf4j +public class PushKafkaJob { + private final HarmonicDataService harmonicDataService; + + @Scheduled(cron="0 0/5 * * * ?") + public void pqOnlinerate() throws Exception { + PushParam pushParam = new PushParam(); + // 获取当前时间 + LocalDateTime now = LocalDateTime.now(); + pushParam.setBeginTime(LocalDateTimeUtil.format(now.minusMinutes(10), DatePattern.NORM_DATETIME_PATTERN)); + pushParam.setEndTime(LocalDateTimeUtil.format(now.minusMinutes(5), DatePattern.NORM_DATETIME_PATTERN)); + harmonicDataService.push(pushParam); + } +} diff --git a/src/main/java/com/njcn/kafkadata/pojo/dto/HarmonicMessageDTO.java b/src/main/java/com/njcn/kafkadata/pojo/dto/HarmonicMessageDTO.java new file mode 100644 index 0000000..31a46d0 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/pojo/dto/HarmonicMessageDTO.java @@ -0,0 +1,17 @@ +package com.njcn.kafkadata.pojo.dto; + +import lombok.Data; + +/** + * Description: + * Date: 2025/05/27 下午 3:32【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class HarmonicMessageDTO { + private String tableName; + private String dataJson; + +} diff --git a/src/main/java/com/njcn/kafkadata/pojo/param/PushParam.java b/src/main/java/com/njcn/kafkadata/pojo/param/PushParam.java new file mode 100644 index 0000000..3d2de8c --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/pojo/param/PushParam.java @@ -0,0 +1,34 @@ +package com.njcn.kafkadata.pojo.param; + +import lombok.Data; + +import java.util.List; +import java.util.Set; + +/** + * Description: + * Date: 2025/05/28 上午 8:47【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class PushParam { + + //补招起始日期_yyyy-MM-dd(按小时跑的任务可加时分秒 + private String beginTime; + + //补招截止日期_yyyy-MM-dd(按小时跑的任务可加时分秒) + private String endTime; + + //时间日期_yyyy-MM-dd(按小时跑的任务可加时分秒) + private String dataDate; + + + + /** + * 待计算的对象索引集合,监测点、设备、母线、变电站、单位等等 + */ + private List idList; + +} diff --git a/src/main/java/com/njcn/kafkadata/pojo/param/TableEnum.java b/src/main/java/com/njcn/kafkadata/pojo/param/TableEnum.java new file mode 100644 index 0000000..2eba3e7 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/pojo/param/TableEnum.java @@ -0,0 +1,73 @@ +package com.njcn.kafkadata.pojo.param; + +import lombok.Getter; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Description: + * Date: 2024/1/4 13:54【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Getter +public enum TableEnum { + DATAFLICKER("DataFlicker","电压闪变数据表", "data_flicker"), + DATAFLUC("DataFluc","电压波动数据表", "data_fluc"), + DATAHARMPHASICI("DataHarmPhasicI","谐波电流角度数据表", "data_harmphasic_i"), + DATAHARMPHASICV("DataHarmPhasicV","谐波电压角度数据表", "data_harmphasic_v"), + DATAHARMPOWERP("DataHarmPowerP","有功功率数据表", "data_harmpower_p"), + DATAHARMPOWERQ("DataHarmPowerQ","无功功率数据表", "data_harmpower_q"), + DATAHARMPOWERS("DataHarmPowerS","视在功率数据表", "data_harmpower_s"), + DATAHARMRATEI("DataHarmRateI","谐波电流含有率数据表", "data_harmrate_i"), + DATAHARMRATEV("DataHarmRateV","谐波电压含有率数据表", "data_harmrate_v"), + DATAINHARMI("DataInHarmI","电流简谐波幅值数据表", "data_inharm_i"), + DATAINHARMV("DataInHarmV","电压间谐波幅值数据表", "data_inharm_v"), + DATAI("DataI","谐波电流幅值数据表", "data_i"), + DATAPLT("DataPlt","长时闪变数据表", "data_plt"), + DATAV("DataV","谐波电压幅值数据表", "data_v"), + ; + + + private final String code; + private final String value; + + private final String tableName; + + TableEnum(String code, String value, String tableName) { + this.code = code; + this.value = value; + this.tableName = tableName; + } + + /** + * 获取所有可执行的组件类型, + * 除了"目录"类型,其他都可以执行 + * @return + */ + public static List getExecutableTypes() { + return Arrays.stream(TableEnum.values()).map(tmep->{ + return tmep.code; + }).collect(Collectors.toList()); + } + + + /** + * 通过code获取枚举值 + * @param code + * @return + */ + public static String getValueByCode(String code) { + for (TableEnum item : TableEnum.values()) { + if (item.code.equals(code)) { + return item.value; + } + } + return null; + } + + +} diff --git a/src/main/java/com/njcn/kafkadata/service/HarmonicDataService.java b/src/main/java/com/njcn/kafkadata/service/HarmonicDataService.java new file mode 100644 index 0000000..8f0fa11 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/service/HarmonicDataService.java @@ -0,0 +1,14 @@ +package com.njcn.kafkadata.service; + +import com.njcn.kafkadata.pojo.param.PushParam; + +/** + * Description: + * Date: 2025/05/28 上午 8:38【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface HarmonicDataService { + void push(PushParam pushParam) throws Exception; +} diff --git a/src/main/java/com/njcn/kafkadata/service/KafkaProducerService.java b/src/main/java/com/njcn/kafkadata/service/KafkaProducerService.java new file mode 100644 index 0000000..760f854 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/service/KafkaProducerService.java @@ -0,0 +1,15 @@ +package com.njcn.kafkadata.service; + +/** + * Description: + * Date: 2025/05/27 下午 1:57【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface KafkaProducerService { + void sendMessage(String topic, String message); + + // 带回调的发送 + void sendMessageWithCallback(String topic, String message); +} diff --git a/src/main/java/com/njcn/kafkadata/service/impl/HarmonicDataServiceImpl.java b/src/main/java/com/njcn/kafkadata/service/impl/HarmonicDataServiceImpl.java new file mode 100644 index 0000000..4b08167 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/service/impl/HarmonicDataServiceImpl.java @@ -0,0 +1,76 @@ +package com.njcn.kafkadata.service.impl; + +import cn.hutool.extra.spring.SpringUtil; +import cn.hutool.json.JSONUtil; +import com.njcn.influx.base.InfluxDbBaseMapper; +import com.njcn.influx.pojo.constant.InfluxDBTableConstant; +import com.njcn.influx.query.InfluxQueryWrapper; +import com.njcn.kafkadata.pojo.dto.HarmonicMessageDTO; +import com.njcn.kafkadata.pojo.param.PushParam; +import com.njcn.kafkadata.pojo.param.TableEnum; +import com.njcn.kafkadata.service.HarmonicDataService; +import com.njcn.kafkadata.service.KafkaProducerService; +import com.njcn.kafkadata.util.TimestampFileUtil; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Description: + * Date: 2025/05/28 上午 8:38【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +@RequiredArgsConstructor +public class HarmonicDataServiceImpl implements HarmonicDataService { + + private final static String PACKAGE_PREFIX = "com.njcn.influx.imapper."; + private final static String PACKAGE_SUFFIX = "Mapper"; + + private final static String TOPIC = "harmonic_data"; + private final static String PACKAGE_PREFIX_BEAN ="com.njcn.influx.pojo.po." ; + private final KafkaProducerService kafkaProducerService; + + @Value("${filePath}") + private String filePath; + + @Override + public void push(PushParam pushParam) throws Exception { + String beginTime = pushParam.getBeginTime(); + String endTime = pushParam.getEndTime(); + + String timestamp = TimestampFileUtil.getTimestamp(filePath); + if(!StringUtils.isEmpty(timestamp)){ + if(timestamp.compareTo(endTime)<=0){ + beginTime=timestamp; + }else { + throw new RuntimeException(pushParam.getBeginTime()+"-"+pushParam.getEndTime()+"时间段数据已进行过推送至Kafka目前已数据已推送至"+timestamp); + } + } + for (TableEnum value : TableEnum.values()) { + String className = value.getCode(); + String tableName = value.getTableName(); + //查询数据 + InfluxDbBaseMapper mapper = (InfluxDbBaseMapper) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + className + PACKAGE_SUFFIX)); + System.out.println(PACKAGE_PREFIX + className + PACKAGE_SUFFIX); + InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(Class.forName(PACKAGE_PREFIX_BEAN+className)); + influxQueryWrapper + .between(InfluxDBTableConstant.TIME, beginTime, endTime); + List list1 = mapper.selectByQueryWrapper(influxQueryWrapper); + list1.forEach(temp->{ + HarmonicMessageDTO harmonicMessageDTO = new HarmonicMessageDTO(); + harmonicMessageDTO.setTableName(tableName); + harmonicMessageDTO.setDataJson(JSONUtil.toJsonStr(temp)); + String jsonStr = JSONUtil.toJsonStr(harmonicMessageDTO); + kafkaProducerService.sendMessage(TOPIC,jsonStr); + }); + } + TimestampFileUtil.updateTimestamp(endTime,filePath); + } +} diff --git a/src/main/java/com/njcn/kafkadata/service/impl/KafkaConsumerService.java b/src/main/java/com/njcn/kafkadata/service/impl/KafkaConsumerService.java new file mode 100644 index 0000000..d5e81c9 --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/service/impl/KafkaConsumerService.java @@ -0,0 +1,25 @@ +//package com.njcn.kafkadata.service.impl; +// +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//import java.util.List; +// +///** +// * Description: +// * Date: 2025/05/28 上午 11:13【需求编号】 +// * +// * @author clam +// * @version V1.0.0 +// */ +//@Service +//public class KafkaConsumerService { +// +// @KafkaListener(topics = "my-topic", groupId = "my-group") +// public void consume(String message) { +// System.out.println("Consumed message: " + message); +// } +// +// // 批量消费 +// +//} diff --git a/src/main/java/com/njcn/kafkadata/service/impl/KafkaProducerServiceImpl.java b/src/main/java/com/njcn/kafkadata/service/impl/KafkaProducerServiceImpl.java new file mode 100644 index 0000000..4ee7d2d --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/service/impl/KafkaProducerServiceImpl.java @@ -0,0 +1,50 @@ +package com.njcn.kafkadata.service.impl; + +import com.njcn.kafkadata.service.KafkaProducerService; +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +/** + * Description: + * Date: 2025/05/27 下午 2:00【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +@RequiredArgsConstructor +public class KafkaProducerServiceImpl implements KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + + @Override + public void sendMessage(String topic, String message) { + kafkaTemplate.send(topic, message); + } + @Override + // 带回调的发送 + public void sendMessageWithCallback(String topic, String message) { + ListenableFuture> future = kafkaTemplate.send(topic, message); + + future.addCallback(new ListenableFutureCallback>() { + + + @Override + public void onSuccess(SendResult result) { + System.out.println("Sent message=[" + message + + "] with offset=[" + result.getRecordMetadata().offset() + "]"); + } + + @Override + public void onFailure(Throwable ex) { + System.out.println("Unable to send message=[" + + message + "] due to : " + ex.getMessage()); + } + }); + } +} diff --git a/src/main/java/com/njcn/kafkadata/util/TimestampFileUtil.java b/src/main/java/com/njcn/kafkadata/util/TimestampFileUtil.java new file mode 100644 index 0000000..53677bc --- /dev/null +++ b/src/main/java/com/njcn/kafkadata/util/TimestampFileUtil.java @@ -0,0 +1,68 @@ +package com.njcn.kafkadata.util; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.io.*; +import java.net.URL; +import java.nio.file.*; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** + * Description: + * Date: 2025/05/28 上午 11:30【需求编号】 + * + * @author clam + * @version V1.0.0 + */ + +public class TimestampFileUtil { + + private static final String FILENAME = "time.txt"; + + /** + * 更新时间戳文件 + */ + public static void updateTimestamp(String endTime,String resourcePath) throws IOException { + Path filePath = Paths.get(resourcePath, FILENAME); + + // 如果文件不存在,创建它;存在则覆盖 + Files.write(filePath, endTime.getBytes()); + } + + /** + * 获取当前时间戳 + */ + public static String getTimestamp(String resourcePath) throws IOException { + Path filePath = Paths.get(resourcePath, FILENAME); + if (!Files.exists(filePath)) { + return null; + } + return new String(Files.readAllBytes(filePath)).trim(); + } + + + + + public static void main(String[] args) { + try { + // 获取当前时间戳(如果有) + String currentTimestamp = getTimestamp("D:"); + if (currentTimestamp != null) { + System.out.println("当前文件中的时间: " + currentTimestamp); + } else { + System.out.println("时间戳文件不存在,将创建新文件"); + } + + // 更新时间戳 + updateTimestamp("2020-01-02 00:00:00","D:"); + + // 打印新时间戳 + System.out.println("更新时间戳为: " + getTimestamp("D:")); + } catch (IOException e) { + System.err.println("操作时间戳文件失败: " + e.getMessage()); + e.printStackTrace(); + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..a2d805d --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,38 @@ +server: + port: 10601 + tomcat: + uri-encoding: UTF-8 +spring: + application: + name: kafka-harmonic-data + kafka: + # Kafka 服务器地址 + bootstrap-servers: 10.121.17.201:9092,10.121.17.202:9092,10.121.17.203:9092 + + # 生产者配置 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # 可选配置 + acks: all + retries: 3 + batch-size: 16384 + buffer-memory: 33554432 + + # 消费者配置 + consumer: + group-id: my-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + # 可选配置 + enable-auto-commit: false + max-poll-records: 500 + #influxDB内容配置 + influx: + url: http://198.120.100.196:8086 + user: pqsadmin + password: Pqsadmin123 + database: PQSBASE + mapper-location: com.njcn.**.imapper +filePath: /usr/kafkaproject