初始版本提交

This commit is contained in:
hzj
2025-10-11 10:16:47 +08:00
commit a4b29c6b76
16 changed files with 657 additions and 0 deletions

38
.gitignore vendored Normal file
View File

@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

92
pom.xml Normal file
View File

@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.njcn</groupId>
<artifactId>kafka-harmonic-data</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version> <!-- 最新 2.x 版本 -->
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.14.RELEASE</version> <!-- 与Spring Boot 2.3.x兼容 -->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>pqs-influx</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<finalName>kafka-harmonic-data</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,25 @@
package com.njcn.kafkadata;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration;
import org.springframework.context.annotation.DependsOn;
/**
* pqs
*
* @author cdf
* @date 2022/11/10
*/
@SpringBootApplication
@DependsOn("proxyMapperRegister")
public class KafkaDataApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDataApplication.class, args);
}
}

View File

@@ -0,0 +1,26 @@
package com.njcn.kafkadata.controller;
import com.njcn.kafkadata.pojo.param.PushParam;
import com.njcn.kafkadata.service.HarmonicDataService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
/**
* Description:
* Date: 2025/05/27 下午 2:56【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@RestController
@RequestMapping("/harmonicdata")
@AllArgsConstructor
public class HarmonicDataController {
private final HarmonicDataService harmonicDataService;
@PostMapping("/push")
public void push(@RequestBody PushParam pushParam) throws Exception {
harmonicDataService.push(pushParam);
}
}

View File

@@ -0,0 +1,28 @@
package com.njcn.kafkadata.controller;
import com.njcn.kafkadata.service.KafkaProducerService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
/**
* Description:
* Date: 2025/05/27 下午 2:06【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@RestController
@RequestMapping("/test")
@AllArgsConstructor
public class KafkaTestController {
private final KafkaProducerService kafkaProducerService;
@GetMapping("/testkafka")
public void test(@RequestParam("context") String context) {
kafkaProducerService.sendMessageWithCallback("harmonic-data",context);
}
}

View File

@@ -0,0 +1,38 @@
package com.njcn.kafkadata.job;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.njcn.kafkadata.pojo.param.PushParam;
import com.njcn.kafkadata.service.HarmonicDataService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* Description:
* Date: 2025/05/28 下午 2:34【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@EnableScheduling
@RequiredArgsConstructor
@Slf4j
public class PushKafkaJob {
private final HarmonicDataService harmonicDataService;
@Scheduled(cron="0 0/5 * * * ?")
public void pqOnlinerate() throws Exception {
PushParam pushParam = new PushParam();
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
pushParam.setBeginTime(LocalDateTimeUtil.format(now.minusMinutes(10), DatePattern.NORM_DATETIME_PATTERN));
pushParam.setEndTime(LocalDateTimeUtil.format(now.minusMinutes(5), DatePattern.NORM_DATETIME_PATTERN));
harmonicDataService.push(pushParam);
}
}

View File

@@ -0,0 +1,17 @@
package com.njcn.kafkadata.pojo.dto;
import lombok.Data;
/**
* Description:
* Date: 2025/05/27 下午 3:32【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class HarmonicMessageDTO {
private String tableName;
private String dataJson;
}

View File

@@ -0,0 +1,34 @@
package com.njcn.kafkadata.pojo.param;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**
* Description:
* Date: 2025/05/28 上午 8:47【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class PushParam {
//补招起始日期_yyyy-MM-dd(按小时跑的任务可加时分秒
private String beginTime;
//补招截止日期_yyyy-MM-dd(按小时跑的任务可加时分秒)
private String endTime;
//时间日期_yyyy-MM-dd(按小时跑的任务可加时分秒)
private String dataDate;
/**
* 待计算的对象索引集合,监测点、设备、母线、变电站、单位等等
*/
private List<String> idList;
}

View File

@@ -0,0 +1,73 @@
package com.njcn.kafkadata.pojo.param;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Description:
* Date: 2024/1/4 13:54【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Getter
public enum TableEnum {
DATAFLICKER("DataFlicker","电压闪变数据表", "data_flicker"),
DATAFLUC("DataFluc","电压波动数据表", "data_fluc"),
DATAHARMPHASICI("DataHarmPhasicI","谐波电流角度数据表", "data_harmphasic_i"),
DATAHARMPHASICV("DataHarmPhasicV","谐波电压角度数据表", "data_harmphasic_v"),
DATAHARMPOWERP("DataHarmPowerP","有功功率数据表", "data_harmpower_p"),
DATAHARMPOWERQ("DataHarmPowerQ","无功功率数据表", "data_harmpower_q"),
DATAHARMPOWERS("DataHarmPowerS","视在功率数据表", "data_harmpower_s"),
DATAHARMRATEI("DataHarmRateI","谐波电流含有率数据表", "data_harmrate_i"),
DATAHARMRATEV("DataHarmRateV","谐波电压含有率数据表", "data_harmrate_v"),
DATAINHARMI("DataInHarmI","电流简谐波幅值数据表", "data_inharm_i"),
DATAINHARMV("DataInHarmV","电压间谐波幅值数据表", "data_inharm_v"),
DATAI("DataI","谐波电流幅值数据表", "data_i"),
DATAPLT("DataPlt","长时闪变数据表", "data_plt"),
DATAV("DataV","谐波电压幅值数据表", "data_v"),
;
private final String code;
private final String value;
private final String tableName;
TableEnum(String code, String value, String tableName) {
this.code = code;
this.value = value;
this.tableName = tableName;
}
/**
* 获取所有可执行的组件类型,
* 除了"目录"类型,其他都可以执行
* @return
*/
public static List<String> getExecutableTypes() {
return Arrays.stream(TableEnum.values()).map(tmep->{
return tmep.code;
}).collect(Collectors.toList());
}
/**
* 通过code获取枚举值
* @param code
* @return
*/
public static String getValueByCode(String code) {
for (TableEnum item : TableEnum.values()) {
if (item.code.equals(code)) {
return item.value;
}
}
return null;
}
}

View File

@@ -0,0 +1,14 @@
package com.njcn.kafkadata.service;
import com.njcn.kafkadata.pojo.param.PushParam;
/**
* Description:
* Date: 2025/05/28 上午 8:38【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface HarmonicDataService {
void push(PushParam pushParam) throws Exception;
}

View File

@@ -0,0 +1,15 @@
package com.njcn.kafkadata.service;
/**
* Description:
* Date: 2025/05/27 下午 1:57【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface KafkaProducerService {
void sendMessage(String topic, String message);
// 带回调的发送
void sendMessageWithCallback(String topic, String message);
}

View File

@@ -0,0 +1,76 @@
package com.njcn.kafkadata.service.impl;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONUtil;
import com.njcn.influx.base.InfluxDbBaseMapper;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.query.InfluxQueryWrapper;
import com.njcn.kafkadata.pojo.dto.HarmonicMessageDTO;
import com.njcn.kafkadata.pojo.param.PushParam;
import com.njcn.kafkadata.pojo.param.TableEnum;
import com.njcn.kafkadata.service.HarmonicDataService;
import com.njcn.kafkadata.service.KafkaProducerService;
import com.njcn.kafkadata.util.TimestampFileUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* Description:
* Date: 2025/05/28 上午 8:38【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class HarmonicDataServiceImpl implements HarmonicDataService {
private final static String PACKAGE_PREFIX = "com.njcn.influx.imapper.";
private final static String PACKAGE_SUFFIX = "Mapper";
private final static String TOPIC = "harmonic_data";
private final static String PACKAGE_PREFIX_BEAN ="com.njcn.influx.pojo.po." ;
private final KafkaProducerService kafkaProducerService;
@Value("${filePath}")
private String filePath;
@Override
public void push(PushParam pushParam) throws Exception {
String beginTime = pushParam.getBeginTime();
String endTime = pushParam.getEndTime();
String timestamp = TimestampFileUtil.getTimestamp(filePath);
if(!StringUtils.isEmpty(timestamp)){
if(timestamp.compareTo(endTime)<=0){
beginTime=timestamp;
}else {
throw new RuntimeException(pushParam.getBeginTime()+"-"+pushParam.getEndTime()+"时间段数据已进行过推送至Kafka目前已数据已推送至"+timestamp);
}
}
for (TableEnum value : TableEnum.values()) {
String className = value.getCode();
String tableName = value.getTableName();
//查询数据
InfluxDbBaseMapper mapper = (InfluxDbBaseMapper) SpringUtil.getBean(Class.forName(PACKAGE_PREFIX + className + PACKAGE_SUFFIX));
System.out.println(PACKAGE_PREFIX + className + PACKAGE_SUFFIX);
InfluxQueryWrapper influxQueryWrapper = new InfluxQueryWrapper(Class.forName(PACKAGE_PREFIX_BEAN+className));
influxQueryWrapper
.between(InfluxDBTableConstant.TIME, beginTime, endTime);
List list1 = mapper.selectByQueryWrapper(influxQueryWrapper);
list1.forEach(temp->{
HarmonicMessageDTO harmonicMessageDTO = new HarmonicMessageDTO();
harmonicMessageDTO.setTableName(tableName);
harmonicMessageDTO.setDataJson(JSONUtil.toJsonStr(temp));
String jsonStr = JSONUtil.toJsonStr(harmonicMessageDTO);
kafkaProducerService.sendMessage(TOPIC,jsonStr);
});
}
TimestampFileUtil.updateTimestamp(endTime,filePath);
}
}

View File

@@ -0,0 +1,25 @@
//package com.njcn.kafkadata.service.impl;
//
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.stereotype.Service;
//
//import java.util.List;
//
///**
// * Description:
// * Date: 2025/05/28 上午 11:13【需求编号】
// *
// * @author clam
// * @version V1.0.0
// */
//@Service
//public class KafkaConsumerService {
//
// @KafkaListener(topics = "my-topic", groupId = "my-group")
// public void consume(String message) {
// System.out.println("Consumed message: " + message);
// }
//
// // 批量消费
//
//}

View File

@@ -0,0 +1,50 @@
package com.njcn.kafkadata.service.impl;
import com.njcn.kafkadata.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Description:
* Date: 2025/05/27 下午 2:00【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl implements KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Override
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
@Override
// 带回调的发送
public void sendMessageWithCallback(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=[" +
message + "] due to : " + ex.getMessage());
}
});
}
}

View File

@@ -0,0 +1,68 @@
package com.njcn.kafkadata.util;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.io.*;
import java.net.URL;
import java.nio.file.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* Description:
* Date: 2025/05/28 上午 11:30【需求编号】
*
* @author clam
* @version V1.0.0
*/
public class TimestampFileUtil {
private static final String FILENAME = "time.txt";
/**
* 更新时间戳文件
*/
public static void updateTimestamp(String endTime,String resourcePath) throws IOException {
Path filePath = Paths.get(resourcePath, FILENAME);
// 如果文件不存在,创建它;存在则覆盖
Files.write(filePath, endTime.getBytes());
}
/**
* 获取当前时间戳
*/
public static String getTimestamp(String resourcePath) throws IOException {
Path filePath = Paths.get(resourcePath, FILENAME);
if (!Files.exists(filePath)) {
return null;
}
return new String(Files.readAllBytes(filePath)).trim();
}
public static void main(String[] args) {
try {
// 获取当前时间戳(如果有)
String currentTimestamp = getTimestamp("D:");
if (currentTimestamp != null) {
System.out.println("当前文件中的时间: " + currentTimestamp);
} else {
System.out.println("时间戳文件不存在,将创建新文件");
}
// 更新时间戳
updateTimestamp("2020-01-02 00:00:00","D:");
// 打印新时间戳
System.out.println("更新时间戳为: " + getTimestamp("D:"));
} catch (IOException e) {
System.err.println("操作时间戳文件失败: " + e.getMessage());
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,38 @@
server:
port: 10601
tomcat:
uri-encoding: UTF-8
spring:
application:
name: kafka-harmonic-data
kafka:
# Kafka 服务器地址
bootstrap-servers: 10.121.17.201:9092,10.121.17.202:9092,10.121.17.203:9092
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 可选配置
acks: all
retries: 3
batch-size: 16384
buffer-memory: 33554432
# 消费者配置
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 可选配置
enable-auto-commit: false
max-poll-records: 500
#influxDB内容配置
influx:
url: http://198.120.100.196:8086
user: pqsadmin
password: Pqsadmin123
database: PQSBASE
mapper-location: com.njcn.**.imapper
filePath: /usr/kafkaproject