From 271c178a0fbcdae34a8630aa4db06ba3deaa3353 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Thu, 4 Jun 2026 14:58:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=B7=E5=8D=97=E6=9A=82=E9=99=8D=E6=8E=A8?= =?UTF-8?q?=E9=80=81=E8=87=B3Kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-event-data/.gitignore | 38 ++++ kafka-event-data/pom.xml | 153 +++++++++++++++ .../kafka/event/KafkaEventtApplication.java | 17 ++ .../event/controller/EventDataController.java | 29 +++ .../event/controller/KafkaTestController.java | 31 ++++ .../mapper/OracleRmpEventDetailPOMapper.java | 20 ++ .../njcn/kafka/event/mapper/PqLineMapper.java | 27 +++ .../event/mapper/mapping/PqLineMapper.xml | 50 +++++ .../kafka/event/pojo/dto/EventPushDTO.java | 68 +++++++ .../event/pojo/dto/LedgerBaseInfoDTO.java | 43 +++++ .../kafka/event/pojo/param/PushParam.java | 34 ++++ .../event/pojo/po/OracleRmpEventDetailPO.java | 174 ++++++++++++++++++ .../com/njcn/kafka/event/pojo/po/PqLine.java | 133 +++++++++++++ .../kafka/event/service/EventDataService.java | 15 ++ .../event/service/KafkaProducerService.java | 15 ++ .../service/impl/EventDataServiceImpl.java | 74 ++++++++ .../impl/KafkaProducerServiceImpl.java | 50 +++++ .../src/main/resources/application-dev.yml | 88 +++++++++ .../src/main/resources/application.yml | 83 +++++++++ .../src/main/resources/logback.xml | 145 +++++++++++++++ .../src/main/resources/templates/hello.html | 12 ++ 21 files changed, 1299 insertions(+) create mode 100644 kafka-event-data/.gitignore create mode 100644 kafka-event-data/pom.xml create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/KafkaEventtApplication.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/controller/EventDataController.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/controller/KafkaTestController.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/OracleRmpEventDetailPOMapper.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/PqLineMapper.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/mapping/PqLineMapper.xml create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/EventPushDTO.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/LedgerBaseInfoDTO.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/param/PushParam.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/OracleRmpEventDetailPO.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/PqLine.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/service/EventDataService.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/service/KafkaProducerService.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/EventDataServiceImpl.java create mode 100644 kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/KafkaProducerServiceImpl.java create mode 100644 kafka-event-data/src/main/resources/application-dev.yml create mode 100644 kafka-event-data/src/main/resources/application.yml create mode 100644 kafka-event-data/src/main/resources/logback.xml create mode 100644 kafka-event-data/src/main/resources/templates/hello.html diff --git a/kafka-event-data/.gitignore b/kafka-event-data/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/kafka-event-data/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/kafka-event-data/pom.xml b/kafka-event-data/pom.xml new file mode 100644 index 0000000..0b36126 --- /dev/null +++ b/kafka-event-data/pom.xml @@ -0,0 +1,153 @@ + + + 4.0.0 + + com.njcn + kafka-event-data + + + + com.njcn.product + CN_Product + 1.0.0 + + + + + + + com.njcn + njcn-common + 0.0.1 + + + + + + + + + + org.springframework.boot + spring-boot-starter-data-redis + + + io.lettuce + lettuce-core + + + + + redis.clients + jedis + + + + + + org.springframework.boot + spring-boot-starter-websocket + 2.7.12 + + + + + com.baomidou + dynamic-datasource-spring-boot-starter + 3.5.1 + + + + + com.njcn + spingboot2.3.12 + 2.3.12 + + + + com.njcn + mybatis-plus + 0.0.1 + + + + + com.oracle.database.jdbc + ojdbc8 + 21.6.0.0 + + + com.oracle.database.nls + orai18n + 21.1.0.0 + + + + + + + + + org.springframework.kafka + spring-kafka + 2.5.14.RELEASE + + + + + + com.google.guava + guava + 32.1.3-jre + + + + + + + + + kafka-event-data + + + org.springframework.boot + spring-boot-maven-plugin + + + package + + repackage + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + + src/main/resources + + **/* + + + + src/main/java + + **/*.xml + + + + + + + diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/KafkaEventtApplication.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/KafkaEventtApplication.java new file mode 100644 index 0000000..9b61b40 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/KafkaEventtApplication.java @@ -0,0 +1,17 @@ +package com.njcn.kafka.event; + +import lombok.extern.slf4j.Slf4j; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@Slf4j +@SpringBootApplication(scanBasePackages = "com.njcn") +@MapperScan("com.njcn.**.mapper") +public class KafkaEventtApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaEventtApplication.class, args); + } + +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/controller/EventDataController.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/controller/EventDataController.java new file mode 100644 index 0000000..bcef0f2 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/controller/EventDataController.java @@ -0,0 +1,29 @@ +package com.njcn.kafka.event.controller; + +import com.njcn.kafka.event.pojo.param.PushParam; +import com.njcn.kafka.event.service.EventDataService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * Description: + * Date: 2025/05/27 下午 2:56【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Slf4j +@RestController +@RequestMapping("/eventdata") +@AllArgsConstructor +public class EventDataController { + private final EventDataService eventDataService; + @PostMapping("/push") + public void push(@RequestBody PushParam pushParam) throws Exception { + eventDataService.push(pushParam); + } +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/controller/KafkaTestController.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/controller/KafkaTestController.java new file mode 100644 index 0000000..49f86d7 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/controller/KafkaTestController.java @@ -0,0 +1,31 @@ +package com.njcn.kafka.event.controller; + +import com.njcn.kafka.event.service.KafkaProducerService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Description: + * Date: 2025/05/27 下午 2:06【需求编号】 + * + * @author clam + * @version V1.0.0 + */ + +@Slf4j +@RestController +@RequestMapping("/test") +@AllArgsConstructor +public class KafkaTestController { + private final KafkaProducerService kafkaProducerService; + + @GetMapping("/testkafka") + + public void test(@RequestParam("context") String context) { + kafkaProducerService.sendMessageWithCallback("eventdata",context); + } +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/OracleRmpEventDetailPOMapper.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/OracleRmpEventDetailPOMapper.java new file mode 100644 index 0000000..7e0029c --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/OracleRmpEventDetailPOMapper.java @@ -0,0 +1,20 @@ +package com.njcn.kafka.event.mapper; + + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.kafka.event.pojo.po.OracleRmpEventDetailPO; + +import java.util.List; + +/** + * data-migration + * + * @author cdf + * @date 2024/2/19 + */ +public interface OracleRmpEventDetailPOMapper extends BaseMapper { + + + + +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/PqLineMapper.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/PqLineMapper.java new file mode 100644 index 0000000..2795170 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/PqLineMapper.java @@ -0,0 +1,27 @@ +package com.njcn.kafka.event.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +import com.njcn.kafka.event.pojo.dto.LedgerBaseInfoDTO; +import com.njcn.kafka.event.pojo.po.PqLine; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +/** + * + * Description: + * Date: 2025/06/19 下午 1:43【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface PqLineMapper extends BaseMapper { + + + + List getBaseLedger(@Param("ids")List ids, @Param("searchValue")String searchValue); + + + +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/mapping/PqLineMapper.xml b/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/mapping/PqLineMapper.xml new file mode 100644 index 0000000..2fe2aaa --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/mapper/mapping/PqLineMapper.xml @@ -0,0 +1,50 @@ + + + + + + + + + + + + + + + diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/EventPushDTO.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/EventPushDTO.java new file mode 100644 index 0000000..202f0d0 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/EventPushDTO.java @@ -0,0 +1,68 @@ +package com.njcn.kafka.event.pojo.dto; + +import lombok.Data; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.List; + +/** + * Description: + * Date: 2026/06/04 上午 9:14【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class EventPushDTO { + /** + * type : + * time : 2025-10-09 13:25:14 + * body : [{"eventdetail_index":"d4963f8f-201f-443f-b293-dda8ef870ee6","lineid":"1591","timeid":"2025-10-09 13:25:14","ms":"509","describe":"","wavetype":"1","persisttime":"23.000","eventvalue":"0.6","eventreason":"","eventtype":"","gdname":"","bdname":"110kV皇后店变","busname":"XX名称","phasicType":"A","pointname":"232待用","wavePath":"192.168.1.102/3_20240515_163022_349"}] + */ + + private String type; + private LocalDateTime time; + private List body; + + @Data + public static class EventDTO { + /** + * eventdetail_index : d4963f8f-201f-443f-b293-dda8ef870ee6 + * lineid : 1591 + * timeid : 2025-10-09 13:25:14 + * ms : 509 + * describe : + * wavetype : 1 + * persisttime : 23.000 + * eventvalue : 0.6 + * eventreason : + * eventtype : + * gdname : + * bdname : 110kV皇后店变 + * busname : XX名称 + * phasicType : A + * pointname : 232待用 + * wavePath : 192.168.1.102/3_20240515_163022_349 + */ + + private String eventdetail_index; + private Integer lineid; + private LocalDateTime timeid; + private Integer ms; + private String describe; + private String wavetype; + private BigDecimal persisttime; + private BigDecimal eventvalue; + private String eventreason; + private String eventtype; + private String gdname; + private String bdname; + private String busname; + private String phasicType; + private String pointname; + private String wavePath; + + + } +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/LedgerBaseInfoDTO.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/LedgerBaseInfoDTO.java new file mode 100644 index 0000000..0009858 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/dto/LedgerBaseInfoDTO.java @@ -0,0 +1,43 @@ +package com.njcn.kafka.event.pojo.dto; + +import lombok.Data; + +/** + * @Author: cdf + * @CreateTime: 2025-06-25 + * @Description: + */ +@Data +public class LedgerBaseInfoDTO { + private String gdName; + private String gdIndex; + + private Integer lineId; + + private String lineName; + + private Integer busBarId; + + private String busBarName; + + private String scale; + + private Integer devId; + + private String devName; + + private String objName; + + private Integer stationId; + + private String stationName; + //通讯状态 + private Integer runFlag=0; + + private Integer eventCount; + private String ip; + + private int pt; + private int ct; + +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/param/PushParam.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/param/PushParam.java new file mode 100644 index 0000000..b7e4a83 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/param/PushParam.java @@ -0,0 +1,34 @@ +package com.njcn.kafka.event.pojo.param; + +import lombok.Data; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * Description: + * Date: 2025/05/28 上午 8:47【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class PushParam { + + //补招起始日期_yyyy-MM-dd(按小时跑的任务可加时分秒 + private LocalDateTime beginTime; + + //补招截止日期_yyyy-MM-dd(按小时跑的任务可加时分秒) + private LocalDateTime endTime; + + //时间日期_yyyy-MM-dd(按小时跑的任务可加时分秒) + private String dataDate; + + + + /** + * 待计算的对象索引集合,监测点、设备、母线、变电站、单位等等 + */ + private List idList; + +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/OracleRmpEventDetailPO.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/OracleRmpEventDetailPO.java new file mode 100644 index 0000000..5716442 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/OracleRmpEventDetailPO.java @@ -0,0 +1,174 @@ +package com.njcn.kafka.event.pojo.po; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Date; + +/** + * data-migration + * + * @author cdf + * @date 2024/2/19 + */ +@Data +@EqualsAndHashCode(callSuper = false) +@TableName("PQS_EVENTDETAIL") +public class OracleRmpEventDetailPO implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 暂时事件ID + */ + @TableId(value = "EVENTDETAIL_INDEX",type = IdType.ASSIGN_ID) + private String eventdetailIndex; + + /** + * 监测点ID + */ + @TableField(value = "LINEID") + private Integer lineid; + + /** + * 统计指标类型 + */ + @TableField(value = "MS") + private Integer ms; + /** + * 统计指标类型 + */ + @TableField(value = "WAVETYPE") + private String wavetype; + + /** + * 暂降原因(字典表PQS_Dicdata) + */ + @TableField(value = "EVENTREASON") + private String eventreason; + + /** + * 暂降类型(字典表PQS_Dicdata) + */ + @TableField(value = "EVENTTYPE") + private String eventtype; + + /** + * 事件关联分析表Guid + */ + @TableField(value = "EVENTASS_INDEX") + private String eventassIndex; + + @TableField(value = "DQTIME") + private Double dqTime; + + /** + * 特征值计算更新时间(外键PQS_Relevance的Time字段) + */ + @TableField(value = "DEALTIME") + private Date dealTime; + + /** + * 默认事件个数为0 + */ + @TableField(value = "NUM") + private Integer num; + + /** + * 波形文件是否从装置招到本地(0:未招,1:已招)默认值为0 + */ + @TableField(value = "FILEFLAG") + private Integer fileFlag; + + /** + * 特征值计算标志(0,未处理;1,已处理; 2,已处理,无结果;3,计算失败)默认值为0 + */ + @TableField(value = "DEALFLAG") + private Integer dealFlag; + + /** + * 处理结果第一条事件发生时间(读comtra文件获取) + */ + @TableField(value = "FIRSTTIME") + private LocalDateTime firstTime; + + /** + * 处理结果第一条事件暂降类型(字典表PQS_Dicdata) + */ + @TableField(value = "FIRSTTYPE") + private String firstType; + + /** + * 处理结果第一条事件发生时间毫秒(读comtra文件获取) + */ + @TableField(value = "FIRSTMS") + private BigDecimal firstMs; + + /** + * 暂降能量 + */ + @TableField(value = "ENERGY") + private Double energy; + + /** + * 暂降严重度 + */ + @TableField(value = "SEVERITY") + private Double severity; + + /** + * 暂降源与监测位置关系 Upper:上游;Lower :下游;Unknown :未知;为空则是未计算 + */ + @TableField(value = "SAGSOURCE") + private String sagsource; + + /** + * 开始时间 + */ + @TableField(value = "TIMEID") + private LocalDateTime timeid; + + /** + * 持续时间,单位秒 + */ + @TableField(value = "PERSISTTIME") + private BigDecimal persisttime; + + /** + * 特征幅值 + */ + @TableField(value = "EVENTVALUE") + private BigDecimal eventvalue; + + /** + * 相别 + */ + @TableField(value = "PHASIC_TYPE") + private String phasicType; + + /** + * 事件描述 + */ + @TableField(value = "DESCRIBE") + private String describe; + + /** + * 波形路径 + */ + @TableField(value = "WAVENAME") + private String wavePath; + + + + @TableField(value = "TRANSIENTVALUE") + private Double transientValue; + + @TableField(value = "CREATE_TIME") + private LocalDateTime createTime; +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/PqLine.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/PqLine.java new file mode 100644 index 0000000..6a32f16 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/pojo/po/PqLine.java @@ -0,0 +1,133 @@ +package com.njcn.kafka.event.pojo.po; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * + * Description: + * Date: 2025/06/19 下午 1:43【需求编号】 + * + * @author clam + * @version V1.0.0 + */ + +/** + * 靠靠? + */ +@Data +@NoArgsConstructor +@TableName(value = "PQ_LINE") +public class PqLine { + /** + * 靠靠 + */ + @TableId(value = "LINE_INDEX", type = IdType.INPUT) + private Integer lineIndex; + + /** + * 靠靠靠 + */ + @TableField(value = "GD_INDEX") + private Integer gdIndex; + + /** + * 靠靠? + */ + @TableField(value = "SUB_INDEX") + private Integer subIndex; + + /** + * 靠靠 + */ + @TableField(value = "SUBV_INDEX") + private Integer subvIndex; + + /** + * 靠靠 + */ + @TableField(value = "DEV_INDEX") + private Integer devIndex; + + /** + * 靠靠 + */ + @TableField(value = "\"NAME\"") + private String name; + + /** + * PT靠靠 + */ + @TableField(value = "PT1") + private Double pt1; + + /** + * PT靠靠 + */ + @TableField(value = "PT2") + private Double pt2; + + /** + * CT靠靠 + */ + @TableField(value = "CT1") + private Double ct1; + + /** + * CT靠靠 + */ + @TableField(value = "CT2") + private Double ct2; + + /** + * 靠靠 + */ + @TableField(value = "DEVCMP") + private Double devcmp; + + /** + * 靠靠 + */ + @TableField(value = "DLCMP") + private Double dlcmp; + + /** + * 靠靠 + */ + @TableField(value = "JZCMP") + private Double jzcmp; + + /** + * 靠靠 + */ + @TableField(value = "XYCMP") + private Double xycmp; + + /** + * 靠?靠靠靠靠靠靠? + */ + @TableField(value = "SUBV_NO") + private Integer subvNo; + + /** + * (靠PQS_Dictionary?靠靠Guid + */ + @TableField(value = "\"SCALE\"") + private String scale; + + /** + * 靠靠 + */ + @TableField(value = "SUBV_NAME") + private String subvName; + + @TableField(exist = false) + private String subName; + + @TableField(exist = false) + private String deptName; +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/service/EventDataService.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/EventDataService.java new file mode 100644 index 0000000..2073d97 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/EventDataService.java @@ -0,0 +1,15 @@ +package com.njcn.kafka.event.service; + + +import com.njcn.kafka.event.pojo.param.PushParam; + +/** + * Description: + * Date: 2025/05/28 上午 8:38【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface EventDataService { + void push(PushParam pushParam) throws Exception; +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/service/KafkaProducerService.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/KafkaProducerService.java new file mode 100644 index 0000000..4be0a87 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/KafkaProducerService.java @@ -0,0 +1,15 @@ +package com.njcn.kafka.event.service; + +/** + * Description: + * Date: 2025/05/27 下午 1:57【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface KafkaProducerService { + void sendMessage(String topic, String message); + + // 带回调的发送 + void sendMessageWithCallback(String topic, String message); +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/EventDataServiceImpl.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/EventDataServiceImpl.java new file mode 100644 index 0000000..ce438c3 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/EventDataServiceImpl.java @@ -0,0 +1,74 @@ +package com.njcn.kafka.event.service.impl; + + +import cn.hutool.json.JSONUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.njcn.kafka.event.mapper.OracleRmpEventDetailPOMapper; +import com.njcn.kafka.event.mapper.PqLineMapper; +import com.njcn.kafka.event.pojo.dto.EventPushDTO; +import com.njcn.kafka.event.pojo.dto.LedgerBaseInfoDTO; +import com.njcn.kafka.event.pojo.param.PushParam; +import com.njcn.kafka.event.pojo.po.OracleRmpEventDetailPO; +import com.njcn.kafka.event.service.EventDataService; +import com.njcn.kafka.event.service.KafkaProducerService; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + + +/** + * Description: + * Date: 2025/05/28 上午 8:38【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +@RequiredArgsConstructor +public class EventDataServiceImpl implements EventDataService { + + + private final KafkaProducerService kafkaProducerService; + private final OracleRmpEventDetailPOMapper oracleRmpEventDetailPOMapper; + private final PqLineMapper pqLineMapper; + + + @Override + public void push(PushParam pushParam) throws Exception { + LocalDateTime beginTime = pushParam.getBeginTime(); + LocalDateTime endTime = pushParam.getEndTime(); + List oracleRmpEventDetailPOList = oracleRmpEventDetailPOMapper.selectList( + new LambdaQueryWrapper() + .between(OracleRmpEventDetailPO::getCreateTime,beginTime,endTime)); + if(!CollectionUtils.isEmpty(oracleRmpEventDetailPOList)){ + List lineIds = oracleRmpEventDetailPOList.stream().map(OracleRmpEventDetailPO::getLineid).collect(Collectors.toList()); + List baseLedger = pqLineMapper.getBaseLedger(lineIds, null); + Map ledgerBaseInfoDTOMap = baseLedger.stream().collect(Collectors.toMap(LedgerBaseInfoDTO::getLineId, Function.identity())); + EventPushDTO eventPushDTO = new EventPushDTO(); + eventPushDTO.setTime(LocalDateTime.now()); + List collect = oracleRmpEventDetailPOList.stream().map(temp -> { + EventPushDTO.EventDTO eventDTO = new EventPushDTO.EventDTO(); + BeanUtils.copyProperties(temp,eventDTO); + eventDTO.setEventdetail_index(temp.getEventdetailIndex()); + eventDTO.setBusname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getBusBarName()); + eventDTO.setBdname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getStationName()); + eventDTO.setPointname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getLineName()); + eventDTO.setGdname(ledgerBaseInfoDTOMap.get(temp.getLineid()).getGdName()); + return eventDTO; + }).collect(Collectors.toList()); + eventPushDTO.setBody(collect); + String jsonStr = JSONUtil.toJsonStr(collect); + + kafkaProducerService.sendMessage("eventdata",jsonStr); + } + + + } +} diff --git a/kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/KafkaProducerServiceImpl.java b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/KafkaProducerServiceImpl.java new file mode 100644 index 0000000..19d2b66 --- /dev/null +++ b/kafka-event-data/src/main/java/com/njcn/kafka/event/service/impl/KafkaProducerServiceImpl.java @@ -0,0 +1,50 @@ +package com.njcn.kafka.event.service.impl; + +import com.njcn.kafka.event.service.KafkaProducerService; +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +/** + * Description: + * Date: 2025/05/27 下午 2:00【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +@RequiredArgsConstructor +public class KafkaProducerServiceImpl implements KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + + @Override + public void sendMessage(String topic, String message) { + kafkaTemplate.send(topic, message); + } + @Override + // 带回调的发送 + public void sendMessageWithCallback(String topic, String message) { + ListenableFuture> future = kafkaTemplate.send(topic, message); + + future.addCallback(new ListenableFutureCallback>() { + + + @Override + public void onSuccess(SendResult result) { + System.out.println("Sent message=[" + message + + "] with offset=[" + result.getRecordMetadata().offset() + "]"); + } + + @Override + public void onFailure(Throwable ex) { + System.out.println("Unable to send message=[" + + message + "] due to : " + ex.getMessage()); + } + }); + } +} diff --git a/kafka-event-data/src/main/resources/application-dev.yml b/kafka-event-data/src/main/resources/application-dev.yml new file mode 100644 index 0000000..8c7ac55 --- /dev/null +++ b/kafka-event-data/src/main/resources/application-dev.yml @@ -0,0 +1,88 @@ +spring: + application: + name: kafka-event-data + kafka: + # Kafka 服务器地址 + bootstrap-servers: 192.168.2.130:9092 + + # 生产者配置 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # 可选配置 + acks: all + retries: 3 + batch-size: 16384 + buffer-memory: 33554432 + + # 消费者配置 + consumer: + group-id: my-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + # 可选配置 + enable-auto-commit: false + max-poll-records: 500 + datasource: + dynamic: + primary: master + strict: false # 是否严格匹配数据源,默认false + druid: # 如果使用Druid连接池 + validation-query: SELECT 1 FROM DUAL # 达梦专用校验SQL + initial-size: 10 + # 初始化大小,最小,最大 + min-idle: 20 + maxActive: 500 + # 配置获取连接等待超时的时间 + maxWait: 60000 + # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + timeBetweenEvictionRunsMillis: 60000 + # 配置一个连接在池中最小生存的时间,单位是毫秒 + minEvictableIdleTimeMillis: 300000 + testWhileIdle: true + testOnBorrow: true + testOnReturn: false + # 打开PSCache,并且指定每个连接上PSCache的大小 + poolPreparedStatements: true + maxPoolPreparedStatementPerConnectionSize: 20 + datasource: + master: + url: jdbc:oracle:thin:@192.168.1.51:1521:pqsbase + username: pqsadmin_hn + password: pqsadmin + driver-class-name: oracle.jdbc.OracleDriver +# salve: +# driver-class-name: dm.jdbc.driver.DmDriver +# url: jdbc:dm://192.168.1.21:5236/PQSADMIN?useUnicode=true&characterEncoding=utf-8 +# username: PQSADMINLN +# password: Pqsadmin123 + + + + redis: + database: 10 + host: localhost + port: 6379 + timeout: 5000 + jedis: + pool: + max-active: 20 + max-wait: 8000 + max-idle: 8 + min-idle: 2 + test-on-borrow: true # 借出连接时验证 + test-on-return: true # 归还连接时验证 + test-while-idle: true # 空闲时验证 + +smsServer: + info: http://22.33.194.50:18096 + netInfo: http://22.33.191.206:18096 + account: xbjbpt + password: WLv8w071 +aliyun: + oss: + endpoint: oss-cn-beijing.aliyuncs.com + accessKeyId: LTAI5tQYuyu1PpiCdeM74PT6 + accessKeySecret: vTGHcQOCF9u7w9FL3HAHJO1oufVWru + bucketName: cn-comtrade diff --git a/kafka-event-data/src/main/resources/application.yml b/kafka-event-data/src/main/resources/application.yml new file mode 100644 index 0000000..6b37aec --- /dev/null +++ b/kafka-event-data/src/main/resources/application.yml @@ -0,0 +1,83 @@ +#当前服务的基本信息 +microservice: + ename: 12345 + name: 12345 +server: + port: 18094 +spring: + application: + name: kafka-event-data + profiles: + active: dev + + +#mybatis配置信息 +mybatis-plus: + mapper-locations: classpath*:com/njcn/**/mapping/*.xml + #别名扫描 + type-aliases-package: com.njcn.product.event.**.pojo + configuration: + #驼峰命名 + map-underscore-to-camel-case: true + #配置sql日志输出 + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + #关闭日志输出 +# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl + global-config: + db-config: + #指定主键生成策略 + id-type: assign_uuid + + +SYS_TYPE_ZT: 1cfcd6e2-c5fe-4b15-988a-32b90f1170c1 +SYS_TYPE_WT: 983f9dfe-4f9a-4c96-89d8-7d425a1f1d6c +db: + type: oracle + +#文件位置配置 +business: + #处理波形数据位置 + wavePath: D://Comtrade + targetPath: /pqmonitor + exportBaseDir: D://exportComtrade + eventCronExpression: 0 0/2 * * * ? + failCronExpression: 0 5/10 * * * ? + userCronExpression: 0 5 1 * * ? + sendMessageCronExpression : 0 */3 * * * ? + syncinterval: 2 + failsyncinterval: 1440 + #实时短信功能 + RealTimeSMSSwitch: false + #wavePath: /usr/local/comtrade + #处理临时数据 + tempPath: D://file + #tempPath: /usr/local/file + #文件存储的方式 3.本地存储 + file: + storage: 3 +#oss服务器配置 +min: + io: + endpoint: http://192.168.1.13:9009 + accessKey: minio + secretKey: minio@123 + bucket: excelreport + #华为obs服务器配置 +huawei: + access-key: J9GS9EA79PZ60OK23LWP + security-key: BirGrAFDSLxU8ow5fffyXgZRAmMRb1R1AdqCI60d + obs: + bucket: test-8601 + endpoint: https://obs.cn-east-3.myhuaweicloud.com + # 单位为秒 + expire: 3600 +#线程池配置信息 +threadPool: + corePoolSize: 10 + maxPoolSize: 20 + queueCapacity: 500 + keepAliveSeconds: 60 + + + + diff --git a/kafka-event-data/src/main/resources/logback.xml b/kafka-event-data/src/main/resources/logback.xml new file mode 100644 index 0000000..8e713c7 --- /dev/null +++ b/kafka-event-data/src/main/resources/logback.xml @@ -0,0 +1,145 @@ + + + + + + + + + + + + + + + + + + + + UTF-8 + %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + UTF-8 + + + + + + + + ${logHomeDir}/${log.projectName}/debug/debug.log + + + + + DEBUG + + ACCEPT + + DENY + + + + + + ${logHomeDir}/${log.projectName}/debug/debug.log.%d{yyyy-MM-dd}.%i.log + + 10MB + + ${log.maxHistory:-30} + + + + + + + + + + ${log.pattern} + + UTF-8 + + + + + + + INFO + ACCEPT + DENY + + + ${logHomeDir}/${log.projectName}/info/info.log + + + + ${logHomeDir}/${log.projectName}/info/info.log.%d{yyyy-MM-dd}.%i.log + + 10MB + ${log.maxHistory:-30} + + + + ${log.pattern} + + UTF-8 + + + + + + + + ${logHomeDir}/${log.projectName}/error/error.log + + + ERROR + ACCEPT + DENY + + + + ${logHomeDir}/${log.projectName}/error/error.log.%d{yyyy-MM-dd}.%i.log + + 10MB + ${log.maxHistory:-30} + + + + ${log.pattern} + + UTF-8 + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kafka-event-data/src/main/resources/templates/hello.html b/kafka-event-data/src/main/resources/templates/hello.html new file mode 100644 index 0000000..8cb9705 --- /dev/null +++ b/kafka-event-data/src/main/resources/templates/hello.html @@ -0,0 +1,12 @@ + + + + + + + +

+ Hello! +

+ + \ No newline at end of file