Compare commits

..

4 Commits

Author SHA1 Message Date
76a254fef4 Merge remote-tracking branch 'origin/main' 2026-04-30 09:02:06 +08:00
6258c2dda5 feat(tools): 新增数据补录功能模块
- 实现批量写入组件,支持 INSERT IGNORE 操作和降级逐行写入
- 添加表定义注册器,自动解析 SQL 元数据并注册 13 张补数表
- 集成异步任务执行器,支持后台补数任务提交和状态管理
- 创建数据补录控制器,提供预估、创建和查询任务状态接口
- 实现时间槽计算和数据生成逻辑,支持多相别数据补录
- 添加线程池配置和任务状态持有器,确保任务并发执行安全
- 创建预览功能,估算补数任务的规模和影响范围
2026-04-30 09:01:43 +08:00
04e1dc2659 tools: refine add-data batch design spec 2026-04-28 12:43:48 +08:00
35471714c8 tools: add add-data batch design spec 2026-04-28 12:25:19 +08:00
35 changed files with 3403 additions and 0 deletions

View File

@@ -33,6 +33,7 @@ Java 源码位于 `src/main/java`,配置文件位于 `src/main/resources`My
## 代码风格与命名规范
保持现有 Java 风格4 空格缩进、UTF-8 文件编码、基础包名使用 `com.njcn.gather`。命名沿用分层后缀,如 `*Controller``*Service``*ServiceImpl``*Mapper``*Param``*PO``*VO`。优先复用现有 Lombok 注解,如 `@Data``@RequiredArgsConstructor``@Slf4j`。Mapper XML 文件名应与接口名保持一致。业务代码中,关键流程、分支判断、状态流转或容易误解的节点需要补充简洁的中文注释,但不要添加无信息量的注释。
- 参照 `user` 模块的组织方式Controller 与 Service 都按职责拆分;不同职责的方法放到不同 `*Controller``*Service``*ServiceImpl` 中,同一模块后续新增方法也要沿既有职责边界归类,不再回退为单一大类承载全部接口。
## 数据与 SQL 约束
- 新增业务表的 DO 优先复用当前 `BaseDO` / 审计字段风格;除非表本身明确不需要逻辑删除,不要再引入另一套审计基类。

View File

@@ -0,0 +1,789 @@
# 电能质量 13 张表批量补数设计说明
## 1. 背景
`tools/add-data` 当前只有模块骨架和历史 SQL 文件 [DATA_FLICKER.sql](D:/Work/SourceCode/CN_Tool/tools/add-data/DATA_FLICKER.sql),尚未提供真实业务接口。
本次需求是在 `add-data` 模块中补齐一套“按时间区间批量生成电能质量数据并写入数据库”的能力,满足以下业务目标:
- 支持按时间区间补数,例如一天、一月。
- 支持时间戳步长选择:`1``3``5``10` 分钟。
- 支持单监测点和多监测点,前端可选择监测点 ID默认单监测点。
- 支持批量写入,优先保证大区间补数时的效率。
- 同一主键数据已存在时跳过,不覆盖、不删除。
- 同步提供前端页面交互方案与参数展示规则。
## 2. 范围确认
根据当前 [DATA_FLICKER.sql](D:/Work/SourceCode/CN_Tool/tools/add-data/DATA_FLICKER.sql) 内容,本次补数范围按 13 张表处理,而不是 14 张表:
1. `data_flicker`
2. `data_fluc`
3. `data_harmphasic_i`
4. `data_harmphasic_v`
5. `data_harmpower_p`
6. `data_harmpower_q`
7. `data_harmpower_s`
8. `data_harmrate_i`
9. `data_harmrate_v`
10. `data_i`
11. `data_inharm_i`
12. `data_plt`
13. `data_v`
## 3. 已确认需求
本次设计基于以下已确认结论:
- 写入方式是“按时间戳步长生成整段数据”,不是定时常驻写库。
- 重复补数时采用“跳过已存在数据”策略。
- 前端方案需要和后端接口一起设计。
- 分表时间轴规则已确认:
- `data_flicker` 固定每 `10` 分钟写一组
- `data_fluc` 固定每 `10` 分钟写一组
- `data_plt` 固定每 `2` 小时写一组
- 其余 `10` 张表按前端选择的 `1``3``5``10` 分钟步长写入
- 所有时间点都按自然时间槽对齐,采用整点整分方式生成,不按传入开始时间顺延
- 展示字段需要覆盖:
- `电能质量参数名称`
- `相别`
- `是否合格`
- `最大值`
- `最小值`
- `平均值`
- `95%概率大值`
- `数据展示(保留小数)`
- 数值展示默认保留 `2` 位小数。
- 相别参照用户提供图片中的“相别”列。
## 4. 关键约束
### 4.1 主键约束
13 张表均以 `LINEID + TIMEID + PHASIC_TYPE` 作为主键,重复写入时必须避免主键冲突导致整批失败。
### 4.2 相别字段长度约束
`PHASIC_TYPE` 字段长度为 `varchar(2)`。因此前端展示中的相别值和数据库实际落库值不能简单等同:
- 页面展示可使用 `T``ABC``AB/BC/CA`
- 落库时必须拆分为真实可存储的 `PHASIC_TYPE`
### 4.3 数据量约束
当时间区间为“一月”、步长为“1 分钟”、监测点为“多选”时,补数规模会非常大,不能采用单条插入或一次性全量加载入内存的方式。
### 4.4 现有仓库约束
- 当前仓库是后端工程,没有现成前端代码,本次只输出前端交互方案和接口契约。
- 默认不执行 `mvn` 编译、打包、测试。
- 修改范围应尽量收敛在 `tools/add-data` 模块内。
## 5. 方案选择
本次比较三种实现路径:
### 方案 A同步接口直接写库
特点:
- 控制器接收参数后直接在请求线程中完成全部写库。
- 实现最简单。
问题:
- 大区间、多监测点时容易超时。
- 任务执行中断后难以定位进度。
- 前端体验差,无法明确看到当前执行状态。
结论:
不采用。
### 方案 B异步任务 + JDBC 批量写入
特点:
- 请求只负责创建任务,后台异步执行。
- 任务状态可轮询。
- 批量写入时可按表、按批次提交。
优势:
- 更适合月级、多监测点补数。
- 可以精细控制批次大小。
- 当前数据源连接串已开启 `rewriteBatchedStatements=true`,适合批处理。
结论:
采用该方案。
### 方案 C只生成 SQL 文件,人工执行
特点:
- 后端只生成 SQL 脚本,不直接写库。
问题:
- 需要人工执行,流程割裂。
- 不符合“前端选择后直接补数”的目标。
结论:
不采用。
## 6. 总体设计
### 6.1 设计目标
`tools/add-data` 中新增一套“批量补数任务”能力,支持:
- 前端提交补数参数
- 后端预估本次写入规模
- 后端异步生成数据并按表批量写入
- 前端轮询任务状态
- 前端获取参数展示规则
### 6.2 模块边界
本次能力只落在 `tools/add-data`,不向 `system``user``detection` 扩散业务逻辑。
如需监测点列表,首版不假设存在基础监测点表,也不新增点位管理能力,直接由前端传入选中的 `lineIds`
### 6.3 接口拆分
建议把“任务管理”和“参数模板展示”分成两类接口:
- 补数任务接口:负责预估、创建、状态查询
- 模板展示接口:负责返回图片对应的参数展示规则
## 7. 数据模型与相别规则
### 7.1 展示相别与落库相别分离
前端展示相别来自用户图片中的“相别”列,但落库时需要展开成真正的 `PHASIC_TYPE` 值。
建议在模板中同时维护:
- `phaseDisplay`:前端展示值
- `phaseCodes`:落库使用的真实相别集合
### 7.2 相别展开规则
建议默认规则如下:
- `T` -> `["T"]`
- `ABC` -> `["A", "B", "C"]`
- `AB/BC/CA` -> `["AB", "BC", "CA"]`
这套规则同时用于:
- 决定每个参数需要展开多少条写库数据
- 决定前端参数表如何显示相别列
### 7.3 参数模板职责
参数模板不直接等于数据库表结构,而是承担以下职责:
- 描述前端展示行
- 绑定该参数对应的数据表
- 描述展示相别和落库相别
- 约定是否展示“是否合格”
- 约定最大值、最小值、平均值、95%概率大值的展示逻辑
- 约定保留小数位数,默认 `2`
## 8. 后端接口设计
### 8.1 预估接口
接口:
- `POST /addData/task/preview`
作用:
- 根据 `lineIds`、时间区间、时间戳步长,返回本次补数的预估规模
入参:
- `lineIds`:监测点 ID 列表
- `startTime`:开始时间
- `endTime`:结束时间
- `intervalMinutes``1``3``5``10`
返回内容:
- 时间点数量
- 监测点数量
- 每张表预估写入条数
- 总预估条数
说明:
- 预估逻辑必须按分表时间轴分别计算。
- `intervalMinutes` 只影响 `data_harmphasic_i``data_harmphasic_v``data_harmpower_p``data_harmpower_q``data_harmpower_s``data_harmrate_i``data_harmrate_v``data_i``data_inharm_i``data_v``10` 张表。
- `data_flicker``data_fluc` 固定按 `10` 分钟时间槽计算。
- `data_plt` 固定按 `2` 小时时间槽计算。
### 8.2 创建任务接口
接口:
- `POST /addData/task/create`
作用:
- 创建一个批量补数任务,并立即进入后台执行
入参:
- `lineIds`
- `startTime`
- `endTime`
- `intervalMinutes`
返回内容:
- `taskId`
- 初始状态
说明:
- 前端默认提交单个 `lineId`
- 多监测点时前端直接提交多个 `lineIds`
- 不单独拆分“单点接口”和“多点接口”
- `intervalMinutes` 仅作为 `10` 张基础实时类表的生成步长
- 固定频率表仍按各自时间轴生成,不跟随 `intervalMinutes`
### 8.3 任务状态接口
接口:
- `GET /addData/task/status/{taskId}`
作用:
- 供前端轮询当前任务执行状态
返回内容:
- 任务状态:`WAITING``RUNNING``SUCCESS``FAILED`
- 当前执行表名
- 当前批次信息
- 已写入条数
- 已跳过条数
- 失败条数
- 失败原因
- 开始时间、结束时间
### 8.4 参数模板接口
接口:
- `GET /addData/template/list`
作用:
- 返回前端页面参数规则表所需的静态配置
返回内容:
- 电能质量参数名称
- 相别展示值
- 是否展示“是否合格”
- 最大值规则
- 最小值规则
- 平均值规则
- 95%概率大值规则
- 保留小数位数
## 9. 后端分层设计
### 9.1 Controller 层
建议新增两个控制器:
- `AddDataTaskController`
- 补数任务相关接口
- `AddDataTemplateController`
- 参数模板查询接口
### 9.2 Service 层
建议拆分为:
- `AddDataTaskService`
- 创建任务
- 状态查询
- 预估规模
- `AddDataTemplateService`
- 返回前端参数模板
### 9.3 Component 层
建议新增以下组件:
- `AddDataTaskExecutor`
- 负责异步执行任务
- `AddDataBatchWriter`
- 负责按表批量写入
- `AddDataValueGenerator`
- 负责根据模板生成每条记录的值
- `AddDataTaskStatusHolder`
- 首版用内存保存任务状态
- `AddDataTemplateRegistry`
- 持有参数模板与相别映射
### 9.4 数据访问层
本次不建议为 13 张表分别创建超长 MyBatis XML 批量插入语句。
原因如下:
- 宽表字段极多XML 可维护性差。
- 13 张表结构差异大,重复代码很多。
- 本次更偏工具型批处理写数,不适合以业务实体方式逐表建模。
建议采用:
- `JdbcTemplate`
- `PreparedStatement`
- `batchUpdate`
在写入 SQL 上统一使用:
- `INSERT IGNORE`
以满足“跳过已存在数据”的业务要求。
## 10. 批量写入策略
### 10.1 总体原则
批量写入必须满足以下原则:
- 不先查存在再插入
- 不整段数据一次性全部放入内存
- 按表、按批次逐步提交
- 主键冲突自动跳过
### 10.2 跳过已存在数据
采用 `INSERT IGNORE` 而不是以下策略:
- 不采用“先查后写”,避免额外查询放大数据库压力
- 不采用“覆盖更新”,与已确认需求不符
- 不采用“先删后写”,避免误删
### 10.3 流式生成
建议按如下顺序流式生成:
1. 先遍历时间点
2. 再遍历监测点
3. 再按模板展开相别
4. 最后路由到对应表的批缓存
达到批次阈值后立即落库并清空当前缓存。
### 10.4 分批阈值
建议根据表宽度分层设置批次大小:
- 窄表:`500 ~ 1000` 行/批
- 宽表:`100 ~ 200` 行/批
其中以下表建议视为宽表:
- `data_harmphasic_i`
- `data_harmphasic_v`
- `data_harmpower_p`
- `data_harmpower_q`
- `data_harmpower_s`
- `data_harmrate_i`
- `data_harmrate_v`
- `data_i`
- `data_inharm_i`
- `data_v`
### 10.5 任务统计
任务执行过程中需累计以下统计值:
- `insertedCount`
- `skippedCount`
- `failedCount`
其中:
- `insertedCount` 表示本次真正插入成功的行数
- `skippedCount` 表示因主键重复被忽略的行数
- `failedCount` 表示非主键冲突导致的失败数量
## 11. 数据生成策略
### 11.1 首版原则
首版不引入复杂公式配置,不让前端传计算表达式。数据生成规则由后端内置。
### 11.2 分表时间轴规则
本次补数不采用“全部表共用同一时间轴”的方式,而是拆成三类时间轴:
1. 固定 `10` 分钟时间轴
2. 固定 `2` 小时时间轴
3. 用户步长时间轴
具体规则如下:
- `data_flicker`
- 固定每 `10` 分钟一组
- 按自然时间槽对齐,例如 `10:00``10:10``10:20`
- `data_fluc`
- 固定每 `10` 分钟一组
- 按自然时间槽对齐
- `data_plt`
- 固定每 `2` 小时一组
- 按自然时间槽对齐,例如 `00:00``02:00``04:00`
- 其余 `10` 张表
- 按前端传入的 `intervalMinutes` 生成
- 允许值仅为 `1``3``5``10`
- 同样按自然时间槽对齐
如果前端传入 `startTime=10:07`
- `10` 分钟表应从 `10:10` 开始
- `2` 小时表应从 `12:00` 或下一个符合槽位的时刻开始
- 用户步长表应从不小于 `startTime` 的下一个对应槽位开始
### 11.3 统一生成方式
建议采用“基础状态生成 + 派生字段回填 + 分表时间轴裁切”的方式:
- 先按 `(lineId, timeId)` 生成基础电气状态
- 再从基础状态派生电压、电流、谐波、功率、闪变
- 最后按各表自己的时间轴决定是否落库
统一原则如下:
- 同一时间点的 13 张表数据必须同源,不允许各表独立随机
- 主值字段按基准值和受控扰动生成
- 最大值字段不小于主值
- 最小值字段不大于主值
- `95%概率大值` 对应 `CP95` 字段
- 宽表中的谐波分量和派生列按统一规则回填
- 同一输入参数应尽量生成可复现的结果,不建议使用完全无约束随机数
### 11.4 逐表生成规律
#### 11.4.1 基础源表
`data_v` 作为电压源表,优先生成:
- 先生成 `FREQ`
- 再生成 `RMS`
- 再派生 `RMSAB``RMSBC``RMSCA`
- 再派生 `VU_DEV``VL_DEV``FREQ_DEV`
- 再派生 `V_POS``V_NEG``V_ZERO``V_UNBALANCE`
- 最后生成 `V_1 ~ V_50`
- `V_THD``V_2 ~ V_50``V_1` 计算
`data_i` 作为电流源表,与 `data_v` 保持同源:
- 先生成 `RMS`
- 再生成 `I_POS``I_NEG``I_ZERO``I_UNBALANCE`
- 再生成 `I_1 ~ I_50`
- `I_THD``I_2 ~ I_50``I_1` 计算
对这两张源表:
- `RMS_MAX``RMS_MIN``RMS_CP95`
- `FREQ_MAX``FREQ_MIN``FREQ_CP95`
- `V_THD_MAX``V_THD_MIN``V_THD_CP95`
- `I_THD_MAX``I_THD_MIN``I_THD_CP95`
均必须由主值派生,不单独随机。
#### 11.4.2 谐波幅值表
`data_harmphasic_v`
- 直接复用 `data_v` 中的 `V_1 ~ V_50`
- 各次谐波的 `MAX``MIN``CP95``data_v` 同源派生
`data_harmphasic_i`
- 直接复用 `data_i` 中的 `I_1 ~ I_50`
- 各次谐波的 `MAX``MIN``CP95``data_i` 同源派生
#### 11.4.3 谐波占比表
`data_harmrate_v`
-`data_harmphasic_v` 派生
- 建议按 `V_n / V_1 * 100` 计算 `V_1 ~ V_50` 对应占比值
- `MAX``MIN``CP95` 也按同样比例换算
`data_harmrate_i`
-`data_harmphasic_i` 派生
- 建议按 `I_n / I_1 * 100` 计算 `I_1 ~ I_50` 对应占比值
- `MAX``MIN``CP95` 也按同样比例换算
#### 11.4.4 间谐波电流表
`data_inharm_i`
- 作为电流间谐波表生成
- 各阶值应明显小于对应整数次谐波值
- 建议按“整数次谐波值乘以小比例系数”生成
- `MAX``MIN``CP95` 继续由主值派生
#### 11.4.5 谐波功率表
`data_harmpower_p`
- 从电压谐波和电流谐波联合派生
- `P_1 ~ P_50` 建议按电压谐波、电流谐波和相位关系计算
- `P` 为各次分量汇总
- `PF``DF` 由功率关系派生
`data_harmpower_q`
-`data_harmpower_p` 同源
- `Q_1 ~ Q_50` 按无功功率关系派生
- `Q` 为各次分量汇总
`data_harmpower_s`
-`data_harmpower_p``data_harmpower_q` 同源
- `S_1 ~ S_50` 建议由 `P_n``Q_n` 派生
- `S` 为各次分量汇总
这三张表中的 `MAX``MIN``CP95` 也必须基于对应主值派生。
#### 11.4.6 闪变与波动表
`data_flicker`
- 固定 `10` 分钟时间轴
- 生成 `FLUC``PST``PLT`
- `FLUC` 作为波动主值
- `PST``FLUC` 派生
- `PLT` 作为长时闪变指标生成,但落库节奏仍为 `10` 分钟
`data_fluc`
- 固定 `10` 分钟时间轴
- 生成 `FLUC``FLUCCF`
-`data_flicker` 同源
- `FLUCCF` 作为 `FLUC` 的修正或归一化结果生成
`data_plt`
- 固定 `2` 小时时间轴
- 只写 `PLT`
- `PLT` 来源应与 `data_flicker` 中的 `PLT` 同源
- 但只在 `2` 小时槽位落表
### 11.5 前端展示字段映射
由于表结构中没有独立 `AVG` 字段,前端“平均值”首版建议直接取各参数主值字段。
例如:
- 电压类参数取 `RMS``V_THD``FREQ`
- 电流类参数取 `RMS``I_THD`
- 闪变类参数取 `FLUC``PST``PLT`
- 功率类参数取 `P``Q``S`
前端展示中的:
- `最大值` -> 对应 `*_MAX`
- `最小值` -> 对应 `*_MIN`
- `95%概率大值` -> 对应 `*_CP95`
- `平均值` -> 对应主值字段
### 11.6 质量标识
`QUALITYFLAG` 首版建议统一使用固定有效值,例如 `1`
前端“是否合格”不直接读取数据库 `QUALITYFLAG` 的原始含义,而是走模板展示规则,避免把数据生成逻辑和展示文案硬耦合。
## 12. 前端页面交互方案
### 12.1 页面结构
建议页面分为两个区域:
1. 补数任务区
2. 参数规则展示区
### 12.2 补数任务区
表单项建议如下:
- 监测点选择模式:`单点` / `多点`
- 监测点 ID 选择框
- 开始时间
- 结束时间
- 时间戳步长:`1``3``5``10` 分钟
- 预计写入量按钮
- 开始补数按钮
交互说明:
- 默认单监测点
- 多监测点时切换为多选
- 时间戳步长只影响 `10` 张基础实时类表
- `data_flicker``data_fluc` 固定按 `10` 分钟槽位写入
- `data_plt` 固定按 `2` 小时槽位写入
- 前端先调用预估接口,展示预计写入规模
- 用户确认后再调用创建任务接口
### 12.3 任务状态展示
前端创建任务后轮询状态接口,建议展示:
- 当前状态
- 当前表名
- 已写入数量
- 已跳过数量
- 失败数量
- 失败原因
- 开始时间
- 结束时间
### 12.4 参数规则展示区
参数规则表按用户提供图片组织,至少展示以下列:
- `电能质量参数名称`
- `相别`
- `显示`
- `最大值`
- `最小值`
- `平均值`
- `95%概率大值`
- `是否合格`
- `数据展示(保留小数)`
其中:
- 相别列使用 `phaseDisplay`
- 所有数值默认保留 `2` 位小数
- 该表展示的是模板规则,不是实时数据库统计结果
## 13. 文件结构建议
建议在 `tools/add-data` 下按现有仓库风格扩展:
```text
tools/add-data/src/main/java/com/njcn/gather/tool/adddata/
├── controller
│ ├── AddDataTaskController.java
│ └── AddDataTemplateController.java
├── service
│ ├── AddDataTaskService.java
│ ├── AddDataTemplateService.java
│ └── impl
│ ├── AddDataTaskServiceImpl.java
│ └── AddDataTemplateServiceImpl.java
├── component
│ ├── AddDataTaskExecutor.java
│ ├── AddDataBatchWriter.java
│ ├── AddDataValueGenerator.java
│ ├── AddDataTaskStatusHolder.java
│ └── AddDataTemplateRegistry.java
└── pojo
├── param
└── vo
```
如参数模板需落成资源文件,建议放在:
- `tools/add-data/src/main/resources/template/add-data-parameter-template.json`
## 14. 非目标
本次不包含以下内容:
- 不新增监测点基础信息管理能力
- 不新增监测点字典表
- 不新增数据库迁移框架
- 不在 `system``user` 模块中扩展通用任务中心
- 不实现真实前端页面代码
- 不做覆盖更新或先删后写逻辑
## 15. 风险与取舍
### 15.1 内存态任务状态的风险
首版如果使用内存保存任务状态:
- 服务重启后任务状态会丢失
- 适合当前工具型场景
- 不适合后续演进为长期审计能力
当前取舍:
- 首版接受该限制
- 如果后续需要审计与追踪,再补任务表
### 15.2 模板与真实表字段映射的风险
用户图片中的参数项比“13 张表名”更偏业务视角,实际落库需要一层参数模板到表结构的映射。
当前取舍:
- 首版把模板规则集中维护在 `AddDataTemplateRegistry`
- 不在前端直接拼接数据库字段名
### 15.3 大批量写入的数据库压力
多监测点、长区间、1 分钟步长时数据库压力会很大。
当前取舍:
- 采用预估接口提前提示规模
- 控制批次大小
- 使用异步执行与 `INSERT IGNORE`
## 16. 验证方式
本次设计和后续实现默认不执行 `mvn`,验证方式以静态检查和链路闭合检查为主。
### 16.1 设计验证
确认以下设计点闭合:
- 13 张表范围明确
- 相别展示与落库规则明确
- 重复数据处理策略明确
- 前后端接口契约明确
- 批量写入策略明确
### 16.2 实现后验证
后续实现时重点检查:
- 入参校验是否覆盖时间区间、步长、监测点 ID 列表
- 预估条数计算是否正确
- SQL 是否统一使用 `INSERT IGNORE`
- 批次是否按宽表和窄表区分
- 重复补数时是否正确累加 `skippedCount`
- 参数模板返回是否与图片字段一致
## 17. 结论
本次采用“异步任务 + JDBC 批量写入 + 参数模板展示接口”的实现方案,在 `tools/add-data` 模块内完成电能质量 13 张表的批量补数能力。
该方案兼顾以下目标:
- 支持单监测点和多监测点
- 支持日级、月级区间
- 支持 1/3/5/10 分钟时间戳步长
- 支持重复补数时跳过已存在数据
- 支持前端按图片规则展示参数和相别
- 在当前仓库约束下把改动范围控制在最小必要集合

View File

@@ -48,6 +48,11 @@
<artifactId>mms-mapping</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.njcn.gather</groupId>
<artifactId>add-data</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>

41
tools/add-data/pom.xml Normal file
View File

@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.njcn.gather</groupId>
<artifactId>tools</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>add-data</artifactId>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>njcn-common</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>spingboot2.3.12</artifactId>
<version>2.3.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,135 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 批量写入组件。
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AddDataBatchWriter {
/** JDBC 模板。 */
private final JdbcTemplate jdbcTemplate;
/**
* 写入一个批次。
*
* @param definition 表定义
* @param rows 行数据
* @return 写入结果
*/
public AddDataBatchWriteResult writeBatch(AddDataTableDefinition definition, List<List<Object>> rows) {
if (rows == null || rows.isEmpty()) {
return new AddDataBatchWriteResult(0L, 0L, 0L, null);
}
try {
return executeInsertIgnore(definition, rows);
} catch (DataAccessException ex) {
log.warn("批量写入失败开始降级为逐行写入table={}, batchSize={}, message={}",
definition.getTableName(), rows.size(), resolveErrorMessage(ex));
return fallbackWriteOneByOne(definition, rows, ex);
}
}
/**
* 执行批量 INSERT IGNORE。
*
* @param definition 表定义
* @param rows 行数据
* @return 写入结果
*/
private AddDataBatchWriteResult executeInsertIgnore(AddDataTableDefinition definition, List<List<Object>> rows) {
String sql = buildInsertIgnoreSql(definition, rows.size());
List<Object> args = flattenRows(rows);
int affectedRows = jdbcTemplate.update(sql, args.toArray(new Object[0]));
long insertedCount = Math.max(affectedRows, 0);
long skippedCount = rows.size() - insertedCount;
return new AddDataBatchWriteResult(insertedCount, skippedCount, 0L, null);
}
/**
* 批量写入失败后逐行降级。
*
* @param definition 表定义
* @param rows 行数据
* @param batchException 原始批量异常
* @return 写入结果
*/
private AddDataBatchWriteResult fallbackWriteOneByOne(AddDataTableDefinition definition, List<List<Object>> rows, DataAccessException batchException) {
long insertedCount = 0L;
long skippedCount = 0L;
long failedCount = 0L;
String firstFailureMessage = null;
String sql = buildInsertIgnoreSql(definition, 1);
for (List<Object> row : rows) {
try {
int affectedRows = jdbcTemplate.update(sql, row.toArray(new Object[0]));
if (affectedRows > 0) {
insertedCount += affectedRows;
} else {
skippedCount++;
}
} catch (DataAccessException ex) {
failedCount++;
if (firstFailureMessage == null) {
firstFailureMessage = resolveErrorMessage(ex);
}
}
}
if (firstFailureMessage == null) {
firstFailureMessage = resolveErrorMessage(batchException);
}
return new AddDataBatchWriteResult(insertedCount, skippedCount, failedCount, firstFailureMessage);
}
/**
* 构建 INSERT IGNORE SQL。
*
* @param definition 表定义
* @param rowCount 行数
* @return SQL 文本
*/
private String buildInsertIgnoreSql(AddDataTableDefinition definition, int rowCount) {
String columnSegment = "`" + String.join("`,`", definition.getColumns()) + "`";
String placeholderSegment = "(" + String.join(",", Collections.nCopies(definition.getColumns().size(), "?")) + ")";
String valuesSegment = String.join(",", Collections.nCopies(rowCount, placeholderSegment));
return "INSERT IGNORE INTO `" + definition.getTableName() + "` (" + columnSegment + ") VALUES " + valuesSegment;
}
/**
* 展平批量参数。
*
* @param rows 行数据
* @return 扁平参数列表
*/
private List<Object> flattenRows(List<List<Object>> rows) {
List<Object> args = new ArrayList<Object>();
for (List<Object> row : rows) {
args.addAll(row);
}
return args;
}
/**
* 提取错误信息。
*
* @param ex 异常
* @return 错误信息
*/
private String resolveErrorMessage(DataAccessException ex) {
Throwable root = ex.getMostSpecificCause();
return root == null ? ex.getMessage() : root.getMessage();
}
}

View File

@@ -0,0 +1,152 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 数据补录表定义注册器。
*/
@Slf4j
@Component
public class AddDataTableRegistry implements InitializingBean {
/** 元数据 SQL 资源。 */
private static final String SCHEMA_RESOURCE_PATH = "sql/add-data/DATA_FLICKER.sql";
/** 建表语句解析表达式。 */
private static final Pattern TABLE_PATTERN = Pattern.compile("CREATE TABLE\\s+`([^`]+)`\\s*\\((.*?)\\)\\s*ENGINE",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
/** 列定义解析表达式。 */
private static final Pattern COLUMN_PATTERN = Pattern.compile("^\\s*`([^`]+)`", Pattern.MULTILINE);
/** 首版统一按 A/B/C/T 四类相别落库。 */
private static final List<String> WRITE_PHASE_CODES = Collections.unmodifiableList(Arrays.asList("A", "B", "C", "T"));
/** 宽表批次大小。 */
private static final int WIDE_TABLE_BATCH_SIZE = 100;
/** 窄表批次大小。 */
private static final int NARROW_TABLE_BATCH_SIZE = 500;
/** 表定义列表。 */
private List<AddDataTableDefinition> tableDefinitions = Collections.emptyList();
@Override
public void afterPropertiesSet() throws Exception {
Map<String, List<String>> columnMap = parseSchemaColumns();
List<AddDataTableDefinition> definitions = new ArrayList<AddDataTableDefinition>();
definitions.add(buildDefinition(columnMap, "data_flicker", AddDataTableDefinition.TimeAxisType.FIXED_TEN_MINUTES, NARROW_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_fluc", AddDataTableDefinition.TimeAxisType.FIXED_TEN_MINUTES, NARROW_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmphasic_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmphasic_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmpower_p", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmpower_q", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmpower_s", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmrate_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmrate_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_inharm_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_plt", AddDataTableDefinition.TimeAxisType.FIXED_TWO_HOURS, NARROW_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
tableDefinitions = Collections.unmodifiableList(definitions);
log.info("加载 add-data 表定义完成tableCount={}", tableDefinitions.size());
}
/**
* 返回全部表定义。
*
* @return 表定义列表
*/
public List<AddDataTableDefinition> getTableDefinitions() {
return tableDefinitions;
}
/**
* 根据表名查找表定义。
*
* @param tableName 表名
* @return 表定义
*/
public AddDataTableDefinition getDefinition(String tableName) {
for (AddDataTableDefinition definition : tableDefinitions) {
if (definition.getTableName().equals(tableName)) {
return definition;
}
}
throw new IllegalArgumentException("未找到 add-data 表定义:" + tableName);
}
/**
* 解析 schema SQL 中的列定义。
*
* @return 表名与字段列表映射
*/
private Map<String, List<String>> parseSchemaColumns() {
String schemaText = loadSchemaText();
Matcher tableMatcher = TABLE_PATTERN.matcher(schemaText);
Map<String, List<String>> columnMap = new LinkedHashMap<String, List<String>>();
while (tableMatcher.find()) {
String tableName = tableMatcher.group(1);
String tableBody = tableMatcher.group(2);
Matcher columnMatcher = COLUMN_PATTERN.matcher(tableBody);
List<String> columns = new ArrayList<String>();
while (columnMatcher.find()) {
columns.add(columnMatcher.group(1));
}
columnMap.put(tableName, columns);
}
return columnMap;
}
/**
* 创建单表定义。
*
* @param columnMap 字段映射
* @param tableName 表名
* @param timeAxisType 时间轴类型
* @param batchSize 批次大小
* @return 表定义
*/
private AddDataTableDefinition buildDefinition(Map<String, List<String>> columnMap, String tableName,
AddDataTableDefinition.TimeAxisType timeAxisType, int batchSize) {
List<String> columns = columnMap.get(tableName);
if (columns == null || columns.isEmpty()) {
throw new IllegalStateException("未从 SQL 元数据中解析到表字段:" + tableName);
}
/*
* 当前按用户最新确认13 张表统一支持 A/B/C/T 四类数据类型。
* 如果后续补齐逐表更细的相别映射,只需要在这里调整落库相别集合即可。
*/
return new AddDataTableDefinition(tableName, columns, WRITE_PHASE_CODES, batchSize, timeAxisType);
}
/**
* 加载 schema SQL 文本。
*
* @return SQL 文本
*/
private String loadSchemaText() {
ClassPathResource resource = new ClassPathResource(SCHEMA_RESOURCE_PATH);
try {
return StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("读取 add-data SQL 元数据失败:" + SCHEMA_RESOURCE_PATH, e);
}
}
}

View File

@@ -0,0 +1,310 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
/**
* 异步任务执行器。
*/
@Slf4j
@Component
public class AddDataTaskExecutor {
/** 多个时间点组成一个处理窗口,兼顾批量效率和内存占用。 */
private static final int TIME_WINDOW_SIZE = 30;
/** 任务线程池。 */
@Qualifier("addDataTaskExecutorService")
private final ExecutorService addDataTaskExecutorService;
/** 表定义注册器。 */
private final AddDataTableRegistry addDataTableRegistry;
/** 时间槽计算器。 */
private final AddDataTimeSlotCalculator addDataTimeSlotCalculator;
/** 数据生成器。 */
private final AddDataValueGenerator addDataValueGenerator;
/** 批量写入组件。 */
private final AddDataBatchWriter addDataBatchWriter;
/** 状态持有器。 */
private final AddDataTaskStatusHolder addDataTaskStatusHolder;
public AddDataTaskExecutor(@Qualifier("addDataTaskExecutorService") ExecutorService addDataTaskExecutorService,
AddDataTableRegistry addDataTableRegistry,
AddDataTimeSlotCalculator addDataTimeSlotCalculator,
AddDataValueGenerator addDataValueGenerator,
AddDataBatchWriter addDataBatchWriter,
AddDataTaskStatusHolder addDataTaskStatusHolder) {
this.addDataTaskExecutorService = addDataTaskExecutorService;
this.addDataTableRegistry = addDataTableRegistry;
this.addDataTimeSlotCalculator = addDataTimeSlotCalculator;
this.addDataValueGenerator = addDataValueGenerator;
this.addDataBatchWriter = addDataBatchWriter;
this.addDataTaskStatusHolder = addDataTaskStatusHolder;
}
/**
* 提交后台任务。
*
* @param taskId 任务编号
* @param command 任务命令
*/
public void submit(String taskId, AddDataTaskCommand command) {
addDataTaskExecutorService.submit(() -> execute(taskId, command));
}
/**
* 执行补数任务。
*
* @param taskId 任务编号
* @param command 任务命令
*/
private void execute(String taskId, AddDataTaskCommand command) {
try {
addDataTaskStatusHolder.markRunning(taskId);
Map<String, List<LocalDateTime>> timeSlotsByTable = buildTimeSlotsByTable(command);
Map<String, Set<LocalDateTime>> timeSlotLookupByTable = buildTimeSlotLookupByTable(timeSlotsByTable);
List<LocalDateTime> mergedTimeSlots = mergeTimeSlots(timeSlotsByTable);
Map<String, Integer> batchNoMap = new HashMap<String, Integer>();
Map<String, List<List<Object>>> pendingRowsByTable = buildPendingRowsByTable();
for (int startIndex = 0; startIndex < mergedTimeSlots.size(); startIndex += TIME_WINDOW_SIZE) {
int endIndex = Math.min(startIndex + TIME_WINDOW_SIZE, mergedTimeSlots.size());
List<LocalDateTime> timeWindow = mergedTimeSlots.subList(startIndex, endIndex);
GeneratedBatchData generatedBatchData = generateBatchData(command, timeWindow, timeSlotLookupByTable);
if (!writeBatchData(taskId, generatedBatchData, pendingRowsByTable, batchNoMap)) {
return;
}
}
if (!flushRemainingBatchData(taskId, pendingRowsByTable, batchNoMap)) {
return;
}
addDataTaskStatusHolder.markSuccess(taskId);
} catch (Exception ex) {
log.error("执行补数任务失败taskId={}", taskId, ex);
addDataTaskStatusHolder.markFailed(taskId, ex.getMessage());
}
}
/**
* 为每张表构建各自命中的时间槽。
*
* @param command 任务命令
* @return 按表名分组的时间槽
*/
private Map<String, List<LocalDateTime>> buildTimeSlotsByTable(AddDataTaskCommand command) {
Map<String, List<LocalDateTime>> result = new LinkedHashMap<String, List<LocalDateTime>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
int intervalMinutes = definition.resolveIntervalMinutes(command.getIntervalMinutes());
List<LocalDateTime> timeSlots = addDataTimeSlotCalculator.buildTimeSlots(
command.getStartTime(), command.getEndTime(), intervalMinutes);
result.put(definition.getTableName(), timeSlots);
}
return result;
}
/**
* 为每张表补充时间槽查找索引,避免窗口生成时重复线性扫描。
*
* @param timeSlotsByTable 按表名分组的时间槽
* @return 按表名分组的时间槽查找集合
*/
private Map<String, Set<LocalDateTime>> buildTimeSlotLookupByTable(Map<String, List<LocalDateTime>> timeSlotsByTable) {
Map<String, Set<LocalDateTime>> result = new LinkedHashMap<String, Set<LocalDateTime>>();
for (Map.Entry<String, List<LocalDateTime>> entry : timeSlotsByTable.entrySet()) {
result.put(entry.getKey(), new HashSet<LocalDateTime>(entry.getValue()));
}
return result;
}
/**
* 为每张表初始化跨窗口复用的待写入缓存。
*
* @return 按表名分组的待写入缓存
*/
private Map<String, List<List<Object>>> buildPendingRowsByTable() {
Map<String, List<List<Object>>> result = new LinkedHashMap<String, List<List<Object>>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
result.put(definition.getTableName(), new ArrayList<List<Object>>());
}
return result;
}
/**
* 合并所有表的时间槽,得到统一的自然时间轴。
*
* @param timeSlotsByTable 按表名分组的时间槽
* @return 去重后的统一时间轴
*/
private List<LocalDateTime> mergeTimeSlots(Map<String, List<LocalDateTime>> timeSlotsByTable) {
TreeSet<LocalDateTime> merged = new TreeSet<LocalDateTime>();
for (List<LocalDateTime> timeSlots : timeSlotsByTable.values()) {
merged.addAll(timeSlots);
}
return new ArrayList<LocalDateTime>(merged);
}
/**
* 按多个时间点窗口生成待写入数据,不直接写库。
*
* @param command 任务命令
* @param timeWindow 当前时间窗口
* @param timeSlotLookupByTable 各表命中的时间槽索引
* @return 当前窗口生成结果
*/
private GeneratedBatchData generateBatchData(AddDataTaskCommand command, List<LocalDateTime> timeWindow,
Map<String, Set<LocalDateTime>> timeSlotLookupByTable) {
Map<String, List<List<Object>>> rowsByTable = new LinkedHashMap<String, List<List<Object>>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
rowsByTable.put(definition.getTableName(), new ArrayList<List<Object>>());
}
for (LocalDateTime timeSlot : timeWindow) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
Set<LocalDateTime> tableTimeSlots = timeSlotLookupByTable.get(definition.getTableName());
if (!tableTimeSlots.contains(timeSlot)) {
continue;
}
List<List<Object>> rows = rowsByTable.get(definition.getTableName());
for (String lineId : command.getLineIds()) {
for (String phaseCode : definition.getPhaseCodes()) {
rows.add(addDataValueGenerator.generateRow(definition, lineId, timeSlot, phaseCode));
}
}
}
}
return new GeneratedBatchData(rowsByTable);
}
/**
* 写入当前窗口已经生成的数据。
*
* @param taskId 任务编号
* @param generatedBatchData 当前窗口生成结果
* @param pendingRowsByTable 跨窗口复用的待写入缓存
* @param batchNoMap 每张表的批次号缓存
* @return true 表示继续执行
*/
private boolean writeBatchData(String taskId, GeneratedBatchData generatedBatchData,
Map<String, List<List<Object>>> pendingRowsByTable, Map<String, Integer> batchNoMap) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
List<List<Object>> rows = generatedBatchData.getRows(definition.getTableName());
if (rows.isEmpty()) {
continue;
}
List<List<Object>> pendingRows = pendingRowsByTable.get(definition.getTableName());
pendingRows.addAll(rows);
int batchSize = definition.getBatchSize();
while (pendingRows.size() >= batchSize) {
List<List<Object>> batchRows = new ArrayList<List<Object>>(pendingRows.subList(0, batchSize));
int batchNo = nextBatchNo(batchNoMap, definition.getTableName());
if (!flushBatch(taskId, definition, batchRows, batchNo)) {
return false;
}
pendingRows.subList(0, batchSize).clear();
}
}
return true;
}
/**
* 任务结束后刷新所有未满批次的尾批数据。
*
* @param taskId 任务编号
* @param pendingRowsByTable 跨窗口复用的待写入缓存
* @param batchNoMap 每张表的批次号缓存
* @return true 表示继续执行
*/
private boolean flushRemainingBatchData(String taskId, Map<String, List<List<Object>>> pendingRowsByTable,
Map<String, Integer> batchNoMap) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
List<List<Object>> pendingRows = pendingRowsByTable.get(definition.getTableName());
if (pendingRows == null || pendingRows.isEmpty()) {
continue;
}
int batchNo = nextBatchNo(batchNoMap, definition.getTableName());
List<List<Object>> batchRows = new ArrayList<List<Object>>(pendingRows);
if (!flushBatch(taskId, definition, batchRows, batchNo)) {
return false;
}
pendingRows.clear();
}
return true;
}
/**
* 获取当前表的下一个批次号。
*
* @param batchNoMap 批次号缓存
* @param tableName 表名
* @return 下一个批次号
*/
private int nextBatchNo(Map<String, Integer> batchNoMap, String tableName) {
Integer currentBatchNo = batchNoMap.get(tableName);
int nextBatchNo = currentBatchNo == null ? 1 : currentBatchNo + 1;
batchNoMap.put(tableName, nextBatchNo);
return nextBatchNo;
}
/**
* 刷新当前批次。
*
* @param taskId 任务编号
* @param definition 表定义
* @param batchRows 批次行
* @param batchNo 批次号
* @return true 表示继续执行
*/
private boolean flushBatch(String taskId, AddDataTableDefinition definition, List<List<Object>> batchRows, int batchNo) {
addDataTaskStatusHolder.updateCurrentBatch(taskId, definition.getTableName(), batchNo, batchRows.size());
AddDataBatchWriteResult writeResult = addDataBatchWriter.writeBatch(definition, batchRows);
addDataTaskStatusHolder.addProgress(taskId, writeResult.getInsertedCount(), writeResult.getSkippedCount(), writeResult.getFailedCount());
if (writeResult.hasFailure()) {
addDataTaskStatusHolder.markFailed(taskId,
"" + definition.getTableName() + "" + batchNo + " 批执行失败:" + writeResult.getFirstFailureMessage());
return false;
}
return true;
}
/**
* 当前窗口生成结果。
*/
private static final class GeneratedBatchData {
/** 按表名分组的待写入行数据。 */
private final Map<String, List<List<Object>>> rowsByTable;
private GeneratedBatchData(Map<String, List<List<Object>>> rowsByTable) {
this.rowsByTable = rowsByTable;
}
/**
* 获取指定表的待写入行。
*
* @param tableName 表名
* @return 待写入行列表
*/
private List<List<Object>> getRows(String tableName) {
List<List<Object>> rows = rowsByTable.get(tableName);
return rows == null ? Collections.emptyList() : rows;
}
}
}

View File

@@ -0,0 +1,240 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import com.njcn.gather.tool.adddata.pojo.enums.AddDataTaskStatusEnum;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 任务状态持有器。
*/
@Component
public class AddDataTaskStatusHolder {
/** 任务记录缓存。 */
private final ConcurrentMap<String, TaskRecord> taskRecordMap = new ConcurrentHashMap<String, TaskRecord>();
/** 时间槽计算组件。 */
private final AddDataTimeSlotCalculator addDataTimeSlotCalculator;
public AddDataTaskStatusHolder(AddDataTimeSlotCalculator addDataTimeSlotCalculator) {
this.addDataTimeSlotCalculator = addDataTimeSlotCalculator;
}
/**
* 创建等待中的任务。
*
* @param command 任务命令
* @return 任务状态快照
*/
public AddDataTaskStatusVO createWaitingTask(AddDataTaskCommand command) {
String taskId = UUID.randomUUID().toString().replace("-", "");
TaskRecord record = new TaskRecord(taskId, command, buildHourlyTimeResults(command));
taskRecordMap.put(taskId, record);
return copySnapshot(record);
}
/**
* 标记为运行中。
*
* @param taskId 任务编号
*/
public void markRunning(String taskId) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.status = AddDataTaskStatusEnum.RUNNING;
record.startTime = LocalDateTime.now();
record.currentBatchInfo = "任务已启动";
}
}
/**
* 更新当前批次信息。
*
* @param taskId 任务编号
* @param tableName 当前表名
* @param batchNo 批次号
* @param batchSize 批次大小
*/
public void updateCurrentBatch(String taskId, String tableName, int batchNo, int batchSize) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.currentTableName = tableName;
record.currentBatchInfo = "" + batchNo + " 批," + batchSize + "";
}
}
/**
* 累加执行统计。
*
* @param taskId 任务编号
* @param insertedCount 新增成功数
* @param skippedCount 跳过数
* @param failedCount 失败数
*/
public void addProgress(String taskId, long insertedCount, long skippedCount, long failedCount) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.insertedCount += insertedCount;
record.skippedCount += skippedCount;
record.failedCount += failedCount;
}
}
/**
* 标记任务成功。
*
* @param taskId 任务编号
*/
public void markSuccess(String taskId) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.status = AddDataTaskStatusEnum.SUCCESS;
record.endTime = LocalDateTime.now();
record.currentBatchInfo = "执行完成";
}
}
/**
* 标记任务失败。
*
* @param taskId 任务编号
* @param failureReason 失败原因
*/
public void markFailed(String taskId, String failureReason) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.status = AddDataTaskStatusEnum.FAILED;
record.endTime = LocalDateTime.now();
record.failureReason = failureReason;
record.currentBatchInfo = "执行失败";
}
}
/**
* 查询任务状态。
*
* @param taskId 任务编号
* @return 状态快照
*/
public AddDataTaskStatusVO getStatus(String taskId) {
return copySnapshot(requireRecord(taskId));
}
/**
* 获取任务命令。
*
* @param taskId 任务编号
* @return 任务命令
*/
public AddDataTaskCommand getCommand(String taskId) {
return requireRecord(taskId).command;
}
/**
* 获取全部任务编号。
*
* @return 任务编号列表
*/
public List<String> getTaskIds() {
return new ArrayList<String>(taskRecordMap.keySet());
}
/**
* 复制状态快照。
*
* @param record 任务记录
* @return 状态快照
*/
private AddDataTaskStatusVO copySnapshot(TaskRecord record) {
synchronized (record) {
AddDataTaskStatusVO status = new AddDataTaskStatusVO();
status.setTaskId(record.taskId);
status.setStatus(record.status.name());
status.setCurrentTableName(record.currentTableName);
status.setCurrentBatchInfo(record.currentBatchInfo);
status.setInsertedCount(record.insertedCount);
status.setSkippedCount(record.skippedCount);
status.setFailedCount(record.failedCount);
status.setFailureReason(record.failureReason);
status.setHourlyTimeResults(new ArrayList<String>(record.hourlyTimeResults));
status.setStartTime(AddDataDateTimeUtil.format(record.startTime));
status.setEndTime(AddDataDateTimeUtil.format(record.endTime));
return status;
}
}
/**
* 根据任务时间范围生成前端展示的每小时时刻。
*
* @param command 任务命令
* @return 每小时时刻文本
*/
private List<String> buildHourlyTimeResults(AddDataTaskCommand command) {
List<LocalDateTime> timeSlots = addDataTimeSlotCalculator.buildHourlyTimeSlots(command.getStartTime(), command.getEndTime());
List<String> result = new ArrayList<String>(timeSlots.size());
for (LocalDateTime timeSlot : timeSlots) {
result.add(AddDataDateTimeUtil.format(timeSlot));
}
return result;
}
/**
* 获取任务记录。
*
* @param taskId 任务编号
* @return 任务记录
*/
private TaskRecord requireRecord(String taskId) {
TaskRecord record = taskRecordMap.get(taskId);
if (record == null) {
throw new IllegalArgumentException("未找到补数任务:" + taskId);
}
return record;
}
/**
* 内存态任务记录。
*/
private static final class TaskRecord {
/** 任务编号。 */
private final String taskId;
/** 任务命令。 */
private final AddDataTaskCommand command;
/** 每小时返回给前端展示的业务时刻。 */
private final List<String> hourlyTimeResults;
/** 当前状态。 */
private AddDataTaskStatusEnum status = AddDataTaskStatusEnum.WAITING;
/** 当前表名。 */
private String currentTableName;
/** 当前批次。 */
private String currentBatchInfo = "等待执行";
/** 成功数。 */
private long insertedCount;
/** 跳过数。 */
private long skippedCount;
/** 失败数。 */
private long failedCount;
/** 失败原因。 */
private String failureReason;
/** 开始时间。 */
private LocalDateTime startTime;
/** 结束时间。 */
private LocalDateTime endTime;
private TaskRecord(String taskId, AddDataTaskCommand command, List<String> hourlyTimeResults) {
this.taskId = taskId;
this.command = command;
this.hourlyTimeResults = hourlyTimeResults;
}
}
}

View File

@@ -0,0 +1,97 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* 前端展示模板注册器。
*/
@Component
public class AddDataTemplateRegistry {
/** 模板列表。 */
private final List<AddDataTemplateVO> templates;
public AddDataTemplateRegistry() {
List<AddDataTemplateVO> result = new ArrayList<AddDataTemplateVO>();
result.add(createTemplate("电压有效值", "data_v", "ABC", Arrays.asList("A", "B", "C"),
"RMS_MAX", "RMS_MIN", "主值字段 RMS", "RMS_CP95"));
result.add(createTemplate("线电压有效值", "data_v", "T", Arrays.asList("T"),
"RMSAB/RMSBC/RMSCA 对应 MAX 字段", "RMSAB/RMSBC/RMSCA 对应 MIN 字段", "主值字段 RMSAB/RMSBC/RMSCA", "RMSAB/RMSBC/RMSCA 对应 CP95 字段"));
result.add(createTemplate("频率", "data_v", "T", Arrays.asList("T"),
"FREQ_MAX", "FREQ_MIN", "主值字段 FREQ", "FREQ_CP95"));
result.add(createTemplate("电压总谐波畸变率", "data_v", "ABC", Arrays.asList("A", "B", "C"),
"V_THD_MAX", "V_THD_MIN", "主值字段 V_THD", "V_THD_CP95"));
result.add(createTemplate("电流有效值", "data_i", "ABC", Arrays.asList("A", "B", "C"),
"RMS_MAX", "RMS_MIN", "主值字段 RMS", "RMS_CP95"));
result.add(createTemplate("电流总谐波畸变率", "data_i", "ABC", Arrays.asList("A", "B", "C"),
"I_THD_MAX", "I_THD_MIN", "主值字段 I_THD", "I_THD_CP95"));
result.add(createTemplate("电压谐波幅值", "data_harmphasic_v", "ABC", Arrays.asList("A", "B", "C"),
"各次 V_n_MAX", "各次 V_n_MIN", "主值字段 V_n", "各次 V_n_CP95"));
result.add(createTemplate("电流谐波幅值", "data_harmphasic_i", "ABC", Arrays.asList("A", "B", "C"),
"各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95"));
result.add(createTemplate("电压谐波含有率", "data_harmrate_v", "ABC", Arrays.asList("A", "B", "C"),
"各次 V_n_MAX", "各次 V_n_MIN", "主值字段 V_n", "各次 V_n_CP95"));
result.add(createTemplate("电流谐波含有率", "data_harmrate_i", "ABC", Arrays.asList("A", "B", "C"),
"各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95"));
result.add(createTemplate("间谐波电流", "data_inharm_i", "ABC", Arrays.asList("A", "B", "C"),
"各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95"));
result.add(createTemplate("有功谐波功率", "data_harmpower_p", "ABC", Arrays.asList("A", "B", "C"),
"P_MAX 与各次 P_n_MAX", "P_MIN 与各次 P_n_MIN", "主值字段 P / P_n", "P_CP95 与各次 P_n_CP95"));
result.add(createTemplate("无功谐波功率", "data_harmpower_q", "ABC", Arrays.asList("A", "B", "C"),
"Q_MAX 与各次 Q_n_MAX", "Q_MIN 与各次 Q_n_MIN", "主值字段 Q / Q_n", "Q_CP95 与各次 Q_n_CP95"));
result.add(createTemplate("视在谐波功率", "data_harmpower_s", "ABC", Arrays.asList("A", "B", "C"),
"S_MAX 与各次 S_n_MAX", "S_MIN 与各次 S_n_MIN", "主值字段 S / S_n", "S_CP95 与各次 S_n_CP95"));
result.add(createTemplate("电压波动", "data_fluc", "T", Arrays.asList("T"),
"当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 FLUC", "当前表无独立 CP95 字段"));
result.add(createTemplate("短时闪变", "data_flicker", "T", Arrays.asList("T"),
"当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 PST", "当前表无独立 CP95 字段"));
result.add(createTemplate("长时闪变", "data_plt", "T", Arrays.asList("T"),
"当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 PLT", "当前表无独立 CP95 字段"));
templates = Collections.unmodifiableList(result);
}
/**
* 返回模板列表。
*
* @return 模板列表
*/
public List<AddDataTemplateVO> getTemplates() {
return templates;
}
/**
* 创建模板对象。
*
* @param parameterName 参数名称
* @param tableName 表名
* @param phaseDisplay 展示相别
* @param phaseCodes 落库相别
* @param maxRule 最大值规则
* @param minRule 最小值规则
* @param averageRule 平均值规则
* @param cp95Rule cp95 规则
* @return 模板对象
*/
private AddDataTemplateVO createTemplate(String parameterName, String tableName, String phaseDisplay, List<String> phaseCodes,
String maxRule, String minRule, String averageRule, String cp95Rule) {
AddDataTemplateVO template = new AddDataTemplateVO();
template.setParameterName(parameterName);
template.setTableName(tableName);
template.setPhaseDisplay(phaseDisplay);
template.setPhaseCodes(new ArrayList<String>(phaseCodes));
template.setDisplay(true);
template.setShowQualified(true);
template.setMaxValueRule(maxRule);
template.setMinValueRule(minRule);
template.setAverageValueRule(averageRule);
template.setCp95ValueRule(cp95Rule);
template.setDecimalScale(2);
return template;
}
}

View File

@@ -0,0 +1,77 @@
package com.njcn.gather.tool.adddata.component;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
/**
* 时间槽计算组件。
*/
@Component
public class AddDataTimeSlotCalculator {
/**
* 构建自然时间槽列表。
*
* @param startTime 开始时间
* @param endTime 结束时间
* @param intervalMinutes 步长,单位分钟
* @return 时间槽列表
*/
public List<LocalDateTime> buildTimeSlots(LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) {
if (intervalMinutes <= 0) {
throw new IllegalArgumentException("时间步长必须大于 0");
}
List<LocalDateTime> result = new ArrayList<LocalDateTime>();
LocalDateTime cursor = alignToNextSlot(startTime, intervalMinutes);
while (!cursor.isAfter(endTime)) {
result.add(cursor);
cursor = cursor.plusMinutes(intervalMinutes);
}
return result;
}
/**
* 构建返回给前端展示的每小时自然时刻列表。
*
* @param startTime 开始时间
* @param endTime 结束时间
* @return 每小时自然时刻列表
*/
public List<LocalDateTime> buildHourlyTimeSlots(LocalDateTime startTime, LocalDateTime endTime) {
return buildTimeSlots(startTime, endTime, 60);
}
/**
* 计算时间槽数量。
*
* @param startTime 开始时间
* @param endTime 结束时间
* @param intervalMinutes 步长
* @return 时间槽数量
*/
public long countTimeSlots(LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) {
return buildTimeSlots(startTime, endTime, intervalMinutes).size();
}
/**
* 向上对齐到最近的自然时间槽。
*
* @param time 原始时间
* @param intervalMinutes 步长
* @return 对齐后的时间
*/
public LocalDateTime alignToNextSlot(LocalDateTime time, int intervalMinutes) {
LocalDateTime minuteFloor = time.truncatedTo(ChronoUnit.MINUTES);
int minuteOfDay = minuteFloor.getHour() * 60 + minuteFloor.getMinute();
int remainder = minuteOfDay % intervalMinutes;
LocalDateTime aligned = minuteFloor.minusMinutes(remainder);
if (remainder != 0 || !minuteFloor.equals(time)) {
aligned = aligned.plusMinutes(intervalMinutes);
}
return aligned;
}
}

View File

@@ -0,0 +1,579 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SplittableRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 电能质量数据生成器。
*/
@Component
public class AddDataValueGenerator {
/** 谐波列匹配。 */
private static final Pattern HARMONIC_PATTERN = Pattern.compile("^([VIQPS])_(\\d+)$");
/** 基础列后缀。 */
private static final String SUFFIX_MAX = "_MAX";
private static final String SUFFIX_MIN = "_MIN";
private static final String SUFFIX_CP95 = "_CP95";
/** 两倍 PI。 */
private static final double TWO_PI = Math.PI * 2D;
/**
* 生成一行落库值。
*
* @param definition 表定义
* @param lineId 监测点 ID
* @param timeId 时间点
* @param phaseCode 相别
* @return 与字段顺序一致的值列表
*/
public List<Object> generateRow(AddDataTableDefinition definition, String lineId, LocalDateTime timeId, String phaseCode) {
MeasurementState state = buildState(lineId, timeId, phaseCode);
Map<String, Double> baseValues = buildBaseValues(definition, state);
List<Object> row = new ArrayList<Object>(definition.getColumns().size());
for (String column : definition.getColumns()) {
if ("TIMEID".equals(column)) {
row.add(Timestamp.valueOf(timeId));
continue;
}
if ("LINEID".equals(column)) {
row.add(lineId);
continue;
}
if ("PHASIC_TYPE".equals(column)) {
row.add(phaseCode);
continue;
}
if ("QUALITYFLAG".equals(column)) {
row.add(1);
continue;
}
row.add(resolveColumnValue(definition.getTableName(), column, baseValues, state));
}
return row;
}
/**
* 按列名解析字段值。
*
* @param tableName 表名
* @param column 列名
* @param baseValues 主值集合
* @param state 基础状态
* @return 对应值
*/
private Double resolveColumnValue(String tableName, String column, Map<String, Double> baseValues, MeasurementState state) {
if (baseValues.containsKey(column)) {
return baseValues.get(column);
}
if (column.endsWith(SUFFIX_MAX)) {
return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_MAX)), column, state, MetricType.MAX);
}
if (column.endsWith(SUFFIX_MIN)) {
return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_MIN)), column, state, MetricType.MIN);
}
if (column.endsWith(SUFFIX_CP95)) {
return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_CP95)), column, state, MetricType.CP95);
}
throw new IllegalStateException("未支持的 add-data 列:" + tableName + "." + column);
}
/**
* 生成主值列集合。
*
* @param definition 表定义
* @param state 基础状态
* @return 主值集合
*/
private Map<String, Double> buildBaseValues(AddDataTableDefinition definition, MeasurementState state) {
Map<String, Double> values = new HashMap<String, Double>();
for (String column : definition.getColumns()) {
if (isInfrastructureColumn(column) || isDerivedColumn(column)) {
continue;
}
values.put(column, resolveBaseMetric(definition.getTableName(), column, state));
}
return values;
}
/**
* 解析基础主值。
*
* @param tableName 表名
* @param column 列名
* @param state 基础状态
* @return 主值
*/
private Double resolveBaseMetric(String tableName, String column, MeasurementState state) {
Matcher matcher = HARMONIC_PATTERN.matcher(column);
if (matcher.matches()) {
String prefix = matcher.group(1);
int order = Integer.parseInt(matcher.group(2));
if ("V".equals(prefix)) {
if ("data_harmrate_v".equals(tableName)) {
return resolveRatioPercent(state.vHarmonics[order], state.vHarmonics[1]);
}
return state.vHarmonics[order];
}
if ("I".equals(prefix)) {
if ("data_harmrate_i".equals(tableName)) {
return resolveRatioPercent(state.iHarmonics[order], state.iHarmonics[1]);
}
if ("data_inharm_i".equals(tableName)) {
return state.iInharmonics[order];
}
return state.iHarmonics[order];
}
if ("P".equals(prefix)) {
return state.pHarmonics[order];
}
if ("Q".equals(prefix)) {
return state.qHarmonics[order];
}
if ("S".equals(prefix)) {
return state.sHarmonics[order];
}
}
switch (column) {
case "RMS":
return "data_v".equals(tableName) ? state.phaseVoltage : state.currentRms;
case "RMSAB":
return state.lineVoltageAB;
case "RMSBC":
return state.lineVoltageBC;
case "RMSCA":
return state.lineVoltageCA;
case "VU_DEV":
return state.vUpperDeviation;
case "VL_DEV":
return state.vLowerDeviation;
case "FREQ":
return state.frequency;
case "FREQ_DEV":
return state.frequencyDeviation;
case "V_UNBALANCE":
return state.vUnbalance;
case "V_POS":
return state.vPositive;
case "V_NEG":
return state.vNegative;
case "V_ZERO":
return state.vZero;
case "V_THD":
return state.vThd;
case "I_UNBALANCE":
return state.iUnbalance;
case "I_POS":
return state.iPositive;
case "I_NEG":
return state.iNegative;
case "I_ZERO":
return state.iZero;
case "I_THD":
return state.iThd;
case "PF":
return state.powerFactor;
case "DF":
return state.distortionFactor;
case "P":
return state.activePower;
case "Q":
return state.reactivePower;
case "S":
return state.apparentPower;
case "FLUC":
return state.fluc;
case "PST":
return state.pst;
case "PLT":
return state.plt;
case "FLUCCF":
return state.flucCf;
default:
throw new IllegalStateException("未支持的基础指标列:" + tableName + "." + column);
}
}
/**
* 生成主值派生指标。
*
* @param baseValue 主值
* @param column 派生列名
* @param state 基础状态
* @param metricType 派生类型
* @return 派生值
*/
private Double deriveMetric(Double baseValue, String column, MeasurementState state, MetricType metricType) {
if (baseValue == null) {
throw new IllegalStateException("派生字段缺少主值:" + column);
}
double factor = noise(state.sharedSeed + column.hashCode(), 0.01D, 0.05D);
double delta = Math.max(Math.abs(baseValue) * factor, 0.005D);
double value;
if (MetricType.MAX.equals(metricType)) {
value = baseValue + delta;
} else if (MetricType.MIN.equals(metricType)) {
value = baseValue - delta;
if (baseValue >= 0D) {
value = Math.max(0D, value);
}
} else {
value = baseValue + delta * 0.6D;
}
return round(value, 4);
}
/**
* 构建同源基础状态。
*
* @param lineId 监测点 ID
* @param timeId 时间
* @param phaseCode 相别
* @return 基础状态
*/
private MeasurementState buildState(String lineId, LocalDateTime timeId, String phaseCode) {
long sharedSeed = Objects.hash(lineId, timeId.getYear(), timeId.getDayOfYear(), timeId.getHour(), timeId.getMinute());
long phaseSeed = Objects.hash(sharedSeed, phaseCode);
double minuteOfDay = timeId.getHour() * 60D + timeId.getMinute();
double dayWave = Math.sin(minuteOfDay / 1440D * TWO_PI);
double fastWave = Math.cos(minuteOfDay / 240D * TWO_PI);
double phaseWave = Math.sin(minuteOfDay / 1440D * TWO_PI + resolvePhaseOffset(phaseCode));
double phaseVoltage = round(220D + dayWave * 3.6D + phaseWave * 1.5D + noise(phaseSeed + 11L, -0.9D, 0.9D), 4);
double lineVoltageAB = round(380D + dayWave * 4.8D + noise(sharedSeed + 21L, -1.2D, 1.2D), 4);
double lineVoltageBC = round(381D + fastWave * 3.6D + noise(sharedSeed + 22L, -1.2D, 1.2D), 4);
double lineVoltageCA = round(379D + Math.sin(minuteOfDay / 720D * TWO_PI) * 4.2D + noise(sharedSeed + 23L, -1.2D, 1.2D), 4);
double frequency = round(50D + Math.sin(minuteOfDay / 180D * TWO_PI) * 0.03D + noise(sharedSeed + 31L, -0.01D, 0.01D), 4);
double frequencyDeviation = round(frequency - 50D, 4);
double currentRms = round(85D + fastWave * 12D + phaseWave * 6D + noise(phaseSeed + 41L, -2D, 2D), 4);
double vPositive = round(phaseVoltage * (0.992D + noise(sharedSeed + 51L, -0.003D, 0.003D)), 4);
double vNegative = round(Math.max(0D, phaseVoltage * (0.008D + noise(phaseSeed + 52L, 0D, 0.006D))), 4);
double vZero = round(Math.max(0D, phaseVoltage * (0.006D + noise(phaseSeed + 53L, 0D, 0.004D))), 4);
double vUnbalance = resolveRatioPercent(vNegative, vPositive);
double vUpperDeviation = round(Math.max((phaseVoltage - 220D) / 220D * 100D, 0D), 4);
double vLowerDeviation = round(Math.max((220D - phaseVoltage) / 220D * 100D, 0D), 4);
double iPositive = round(currentRms * (0.986D + noise(sharedSeed + 61L, -0.003D, 0.003D)), 4);
double iNegative = round(Math.max(0D, currentRms * (0.016D + noise(phaseSeed + 62L, 0D, 0.01D))), 4);
double iZero = round(Math.max(0D, currentRms * (0.011D + noise(phaseSeed + 63L, 0D, 0.008D))), 4);
double iUnbalance = resolveRatioPercent(iNegative, iPositive);
double[] vHarmonics = new double[51];
double[] iHarmonics = new double[51];
double[] iInharmonics = new double[51];
vHarmonics[1] = phaseVoltage;
iHarmonics[1] = currentRms;
iInharmonics[1] = round(currentRms * 0.06D, 4);
for (int order = 2; order <= 50; order++) {
double voltageRatio = (0.015D / order) * (1D + noise(sharedSeed + order, -0.35D, 0.35D));
double currentRatio = (0.08D / Math.sqrt(order)) * (1D + noise(phaseSeed + order, -0.4D, 0.4D));
vHarmonics[order] = round(Math.max(0D, phaseVoltage * voltageRatio), 4);
iHarmonics[order] = round(Math.max(0D, currentRms * currentRatio), 4);
iInharmonics[order] = round(Math.max(0D, iHarmonics[order] * noise(phaseSeed + order * 13L, 0.08D, 0.18D)), 4);
}
double vThd = round(resolveThd(vHarmonics), 4);
double iThd = round(resolveThd(iHarmonics), 4);
double[] pHarmonics = new double[51];
double[] qHarmonics = new double[51];
double[] sHarmonics = new double[51];
double activePower = 0D;
double reactivePower = 0D;
double apparentPower = 0D;
for (int order = 1; order <= 50; order++) {
double apparent = Math.max(0D, round(vHarmonics[order] * iHarmonics[order] / 1000D, 4));
double angle = 0.16D + order * 0.008D + noise(sharedSeed + order * 17L, -0.03D, 0.03D);
double active = round(apparent * Math.cos(angle), 4);
double reactive = round(apparent * Math.sin(angle), 4);
pHarmonics[order] = active;
qHarmonics[order] = reactive;
sHarmonics[order] = round(Math.sqrt(active * active + reactive * reactive), 4);
activePower += active;
reactivePower += reactive;
apparentPower += sHarmonics[order];
}
activePower = round(activePower, 4);
reactivePower = round(reactivePower, 4);
apparentPower = round(apparentPower, 4);
double powerFactor = apparentPower == 0D ? 1D : round(clamp(activePower / apparentPower, -1D, 1D), 4);
double distortionFactor = round(1D / Math.sqrt(1D + Math.pow(vThd / 100D, 2D) + Math.pow(iThd / 100D, 2D)), 4);
double fluc = round(0.35D + Math.abs(dayWave) * 0.18D + noise(sharedSeed + 71L, -0.03D, 0.03D), 4);
double pst = round(fluc * 1.12D + 0.08D + noise(sharedSeed + 72L, -0.02D, 0.02D), 4);
double plt = round(Math.max(0.01D, pst * 0.92D + 0.05D + noise(sharedSeed + 73L, -0.02D, 0.02D)), 4);
double flucCf = round(fluc * (0.96D + noise(sharedSeed + 74L, -0.03D, 0.03D)), 4);
return new MeasurementState(sharedSeed, phaseVoltage, lineVoltageAB, lineVoltageBC, lineVoltageCA,
frequency, frequencyDeviation, currentRms, vPositive, vNegative, vZero, vUnbalance,
vUpperDeviation, vLowerDeviation, iPositive, iNegative, iZero, iUnbalance,
vHarmonics, iHarmonics, iInharmonics, vThd, iThd, pHarmonics, qHarmonics, sHarmonics,
activePower, reactivePower, apparentPower, powerFactor, distortionFactor, fluc, pst, plt, flucCf);
}
/**
* 解析百分比。
*
* @param part 分子
* @param total 分母
* @return 百分比
*/
private double resolveRatioPercent(double part, double total) {
if (total == 0D) {
return 0D;
}
return round(part / total * 100D, 4);
}
/**
* 解析 THD。
*
* @param harmonics 谐波数组
* @return thd 百分比
*/
private double resolveThd(double[] harmonics) {
if (harmonics[1] == 0D) {
return 0D;
}
double sum = 0D;
for (int order = 2; order < harmonics.length; order++) {
sum += harmonics[order] * harmonics[order];
}
return Math.sqrt(sum) / harmonics[1] * 100D;
}
/**
* 解析相别偏移角。
*
* @param phaseCode 相别
* @return 偏移角
*/
private double resolvePhaseOffset(String phaseCode) {
if ("B".equals(phaseCode)) {
return TWO_PI / 3D;
}
if ("C".equals(phaseCode)) {
return TWO_PI / 3D * 2D;
}
return 0D;
}
/**
* 生成稳定随机扰动。
*
* @param seed 种子
* @param min 最小值
* @param max 最大值
* @return 扰动值
*/
private double noise(long seed, double min, double max) {
return min + (max - min) * new SplittableRandom(seed).nextDouble();
}
/**
* 截断小数位。
*
* @param value 原始值
* @param scale 保留位数
* @return 处理后的值
*/
private double round(double value, int scale) {
if (Math.abs(value) < 0.0000001D) {
return 0D;
}
return BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_UP).doubleValue();
}
/**
* 移除列后缀。
*
* @param column 列名
* @param suffix 后缀
* @return 主值列名
*/
private String removeSuffix(String column, String suffix) {
return column.substring(0, column.length() - suffix.length());
}
/**
* 判断是否为系统字段。
*
* @param column 列名
* @return true 表示系统字段
*/
private boolean isInfrastructureColumn(String column) {
return "TIMEID".equals(column) || "LINEID".equals(column) || "PHASIC_TYPE".equals(column) || "QUALITYFLAG".equals(column);
}
/**
* 判断是否为派生列。
*
* @param column 列名
* @return true 表示派生列
*/
private boolean isDerivedColumn(String column) {
return column.endsWith(SUFFIX_MAX) || column.endsWith(SUFFIX_MIN) || column.endsWith(SUFFIX_CP95);
}
/**
* 数值类型。
*/
private enum MetricType {
/** 最大值。 */
MAX,
/** 最小值。 */
MIN,
/** cp95。 */
CP95
}
/**
* 数值约束。
*
* @param value 值
* @param min 最小值
* @param max 最大值
* @return 限幅结果
*/
private double clamp(double value, double min, double max) {
return Math.max(min, Math.min(max, value));
}
/**
* 同源基础状态。
*/
private static final class MeasurementState {
/** 共享种子。 */
private final long sharedSeed;
/** 电压有效值。 */
private final double phaseVoltage;
/** AB 线电压。 */
private final double lineVoltageAB;
/** BC 线电压。 */
private final double lineVoltageBC;
/** CA 线电压。 */
private final double lineVoltageCA;
/** 频率。 */
private final double frequency;
/** 频偏。 */
private final double frequencyDeviation;
/** 电流有效值。 */
private final double currentRms;
/** 电压正序。 */
private final double vPositive;
/** 电压负序。 */
private final double vNegative;
/** 电压零序。 */
private final double vZero;
/** 电压不平衡度。 */
private final double vUnbalance;
/** 电压上偏差。 */
private final double vUpperDeviation;
/** 电压下偏差。 */
private final double vLowerDeviation;
/** 电流正序。 */
private final double iPositive;
/** 电流负序。 */
private final double iNegative;
/** 电流零序。 */
private final double iZero;
/** 电流不平衡度。 */
private final double iUnbalance;
/** 电压谐波。 */
private final double[] vHarmonics;
/** 电流谐波。 */
private final double[] iHarmonics;
/** 间谐波电流。 */
private final double[] iInharmonics;
/** 电压 thd。 */
private final double vThd;
/** 电流 thd。 */
private final double iThd;
/** 有功谐波功率。 */
private final double[] pHarmonics;
/** 无功谐波功率。 */
private final double[] qHarmonics;
/** 视在谐波功率。 */
private final double[] sHarmonics;
/** 总有功。 */
private final double activePower;
/** 总无功。 */
private final double reactivePower;
/** 总视在。 */
private final double apparentPower;
/** 功率因数。 */
private final double powerFactor;
/** 畸变因数。 */
private final double distortionFactor;
/** 波动值。 */
private final double fluc;
/** 短时闪变。 */
private final double pst;
/** 长时闪变。 */
private final double plt;
/** 波动修正值。 */
private final double flucCf;
private MeasurementState(long sharedSeed, double phaseVoltage, double lineVoltageAB, double lineVoltageBC, double lineVoltageCA,
double frequency, double frequencyDeviation, double currentRms, double vPositive, double vNegative,
double vZero, double vUnbalance, double vUpperDeviation, double vLowerDeviation, double iPositive,
double iNegative, double iZero, double iUnbalance, double[] vHarmonics, double[] iHarmonics,
double[] iInharmonics, double vThd, double iThd, double[] pHarmonics, double[] qHarmonics,
double[] sHarmonics, double activePower, double reactivePower, double apparentPower,
double powerFactor, double distortionFactor, double fluc, double pst, double plt, double flucCf) {
this.sharedSeed = sharedSeed;
this.phaseVoltage = phaseVoltage;
this.lineVoltageAB = lineVoltageAB;
this.lineVoltageBC = lineVoltageBC;
this.lineVoltageCA = lineVoltageCA;
this.frequency = frequency;
this.frequencyDeviation = frequencyDeviation;
this.currentRms = currentRms;
this.vPositive = vPositive;
this.vNegative = vNegative;
this.vZero = vZero;
this.vUnbalance = vUnbalance;
this.vUpperDeviation = vUpperDeviation;
this.vLowerDeviation = vLowerDeviation;
this.iPositive = iPositive;
this.iNegative = iNegative;
this.iZero = iZero;
this.iUnbalance = iUnbalance;
this.vHarmonics = vHarmonics;
this.iHarmonics = iHarmonics;
this.iInharmonics = iInharmonics;
this.vThd = vThd;
this.iThd = iThd;
this.pHarmonics = pHarmonics;
this.qHarmonics = qHarmonics;
this.sHarmonics = sHarmonics;
this.activePower = activePower;
this.reactivePower = reactivePower;
this.apparentPower = apparentPower;
this.powerFactor = powerFactor;
this.distortionFactor = distortionFactor;
this.fluc = fluc;
this.pst = pst;
this.plt = plt;
this.flucCf = flucCf;
}
}
}

View File

@@ -0,0 +1,37 @@
package com.njcn.gather.tool.adddata.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* add-data 模块任务线程池配置。
*/
@Slf4j
@Configuration
public class AddDataExecutorConfig {
@Bean(name = "addDataTaskExecutorService", destroyMethod = "shutdown")
public ExecutorService addDataTaskExecutorService() {
AtomicInteger threadIndex = new AtomicInteger(1);
return new ThreadPoolExecutor(
1,
1,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(8),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("add-data-task-" + threadIndex.getAndIncrement());
return thread;
},
(runnable, executor) -> log.warn("数据补录任务线程池已满,拒绝新的补数任务")
);
}
}

View File

@@ -0,0 +1,90 @@
package com.njcn.gather.tool.adddata.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.LogUtil;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.service.AddDataTaskService;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 数据补录任务接口。
*/
@Validated
@Slf4j
@Api(tags = "数据补录任务")
@RestController
@RequestMapping("/addData/task")
@RequiredArgsConstructor
public class AddDataTaskController extends BaseController {
/** 数据补录任务服务。 */
private final AddDataTaskService addDataTaskService;
/**
* 预估本次补数规模。
*
* @param param 补数参数
* @return 预估结果
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("预估电能质量批量补数规模")
@PostMapping("/preview")
public HttpResult<AddDataPreviewVO> preview(@RequestBody @Validated AddDataTaskRequestParam param) {
String methodDescribe = getMethodDescribe("preview");
LogUtil.njcnDebug(log, "{}开始预估补数规模lineCount={}, intervalMinutes={}",
methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes());
AddDataPreviewVO result = addDataTaskService.preview(param);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
/**
* 创建后台补数任务。
*
* @param param 补数参数
* @return 任务编号
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("创建电能质量批量补数任务")
@PostMapping("/create")
public HttpResult<AddDataTaskCreateVO> create(@RequestBody @Validated AddDataTaskRequestParam param) {
String methodDescribe = getMethodDescribe("create");
LogUtil.njcnDebug(log, "{}开始创建补数任务lineCount={}, intervalMinutes={}",
methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes());
AddDataTaskCreateVO result = addDataTaskService.create(param);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
/**
* 查询任务状态。
*
* @param taskId 任务编号
* @return 当前任务状态
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("查询电能质量批量补数任务状态")
@GetMapping("/status/{taskId}")
public HttpResult<AddDataTaskStatusVO> status(@PathVariable("taskId") String taskId) {
String methodDescribe = getMethodDescribe("status");
LogUtil.njcnDebug(log, "{}开始查询补数任务状态taskId={}", methodDescribe, taskId);
AddDataTaskStatusVO result = addDataTaskService.getStatus(taskId);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -0,0 +1,49 @@
package com.njcn.gather.tool.adddata.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.LogUtil;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import com.njcn.gather.tool.adddata.service.AddDataTemplateService;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 数据补录模板展示接口。
*/
@Slf4j
@Api(tags = "数据补录模板")
@RestController
@RequestMapping("/addData/template")
@RequiredArgsConstructor
public class AddDataTemplateController extends BaseController {
/** 数据补录模板服务。 */
private final AddDataTemplateService addDataTemplateService;
/**
* 返回前端模板展示规则。
*
* @return 模板列表
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("查询数据补录模板规则")
@GetMapping("/list")
public HttpResult<List<AddDataTemplateVO>> list() {
String methodDescribe = getMethodDescribe("list");
LogUtil.njcnDebug(log, "{},开始查询数据补录模板规则", methodDescribe);
List<AddDataTemplateVO> result = addDataTemplateService.listTemplates();
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -0,0 +1,38 @@
package com.njcn.gather.tool.adddata.pojo.bo;
import lombok.Getter;
/**
* 批量写入结果。
*/
@Getter
public class AddDataBatchWriteResult {
/** 成功插入条数。 */
private final long insertedCount;
/** 因主键重复被跳过的条数。 */
private final long skippedCount;
/** 非主键冲突失败条数。 */
private final long failedCount;
/** 第一条失败原因。 */
private final String firstFailureMessage;
public AddDataBatchWriteResult(long insertedCount, long skippedCount, long failedCount, String firstFailureMessage) {
this.insertedCount = insertedCount;
this.skippedCount = skippedCount;
this.failedCount = failedCount;
this.firstFailureMessage = firstFailureMessage;
}
/**
* 当前批次是否存在失败。
*
* @return true 表示存在失败
*/
public boolean hasFailure() {
return failedCount > 0;
}
}

View File

@@ -0,0 +1,64 @@
package com.njcn.gather.tool.adddata.pojo.bo;
import lombok.Getter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 数据补录表定义。
*/
@Getter
public class AddDataTableDefinition {
/** 分表时间轴类型。 */
public enum TimeAxisType {
/** 使用前端传入步长。 */
REQUEST_INTERVAL,
/** 固定 10 分钟。 */
FIXED_TEN_MINUTES,
/** 固定 2 小时。 */
FIXED_TWO_HOURS
}
/** 表名。 */
private final String tableName;
/** 字段列表,顺序与落库 SQL 保持一致。 */
private final List<String> columns;
/** 首版实际落库相别集合。 */
private final List<String> phaseCodes;
/** 单批写入大小。 */
private final int batchSize;
/** 时间轴类型。 */
private final TimeAxisType timeAxisType;
public AddDataTableDefinition(String tableName, List<String> columns, List<String> phaseCodes,
int batchSize, TimeAxisType timeAxisType) {
this.tableName = tableName;
this.columns = Collections.unmodifiableList(new ArrayList<String>(columns));
this.phaseCodes = Collections.unmodifiableList(new ArrayList<String>(phaseCodes));
this.batchSize = batchSize;
this.timeAxisType = timeAxisType;
}
/**
* 解析当前表真实使用的时间步长。
*
* @param requestIntervalMinutes 前端传入步长
* @return 当前表步长
*/
public int resolveIntervalMinutes(int requestIntervalMinutes) {
if (TimeAxisType.FIXED_TEN_MINUTES.equals(timeAxisType)) {
return 10;
}
if (TimeAxisType.FIXED_TWO_HOURS.equals(timeAxisType)) {
return 120;
}
return requestIntervalMinutes;
}
}

View File

@@ -0,0 +1,34 @@
package com.njcn.gather.tool.adddata.pojo.bo;
import lombok.Getter;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 归一化后的补数任务命令。
*/
@Getter
public class AddDataTaskCommand {
/** 监测点 ID 列表。 */
private final List<String> lineIds;
/** 开始时间。 */
private final LocalDateTime startTime;
/** 结束时间。 */
private final LocalDateTime endTime;
/** 用户步长。 */
private final int intervalMinutes;
public AddDataTaskCommand(List<String> lineIds, LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) {
this.lineIds = Collections.unmodifiableList(new ArrayList<String>(lineIds));
this.startTime = startTime;
this.endTime = endTime;
this.intervalMinutes = intervalMinutes;
}
}

View File

@@ -0,0 +1,15 @@
package com.njcn.gather.tool.adddata.pojo.enums;
/**
* 数据补录任务状态。
*/
public enum AddDataTaskStatusEnum {
/** 等待执行。 */
WAITING,
/** 执行中。 */
RUNNING,
/** 执行成功。 */
SUCCESS,
/** 执行失败。 */
FAILED
}

View File

@@ -0,0 +1,35 @@
package com.njcn.gather.tool.adddata.pojo.param;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.List;
/**
* 数据补录任务请求参数。
*/
@Data
@ApiModel("数据补录任务请求参数")
public class AddDataTaskRequestParam {
@ApiModelProperty(value = "监测点 ID 列表", required = true)
@NotEmpty(message = "监测点 ID 列表不能为空")
private List<String> lineIds = new ArrayList<String>();
@ApiModelProperty(value = "开始时间,格式 yyyy-MM-dd HH:mm:ss", required = true)
@NotBlank(message = "开始时间不能为空")
private String startTime;
@ApiModelProperty(value = "结束时间,格式 yyyy-MM-dd HH:mm:ss", required = true)
@NotBlank(message = "结束时间不能为空")
private String endTime;
@ApiModelProperty(value = "用户步长,仅支持 1/3/5/10 分钟", required = true)
@NotNull(message = "时间步长不能为空")
private Integer intervalMinutes;
}

View File

@@ -0,0 +1,22 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
/**
* 单表预估结果。
*/
@Data
public class AddDataPreviewTableVO {
/** 表名。 */
private String tableName;
/** 当前表命中的时间点数量。 */
private long timePointCount;
/** 当前表实际展开的落库相别数量。 */
private int phaseCount;
/** 当前表预估写入条数。 */
private long rowCount;
}

View File

@@ -0,0 +1,25 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 补数规模预估结果。
*/
@Data
public class AddDataPreviewVO {
/** 监测点数量。 */
private int lineCount;
/** 用户步长。 */
private int intervalMinutes;
/** 总预估条数。 */
private long totalRowCount;
/** 单表预估详情。 */
private List<AddDataPreviewTableVO> tableStats = new ArrayList<AddDataPreviewTableVO>();
}

View File

@@ -0,0 +1,16 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
/**
* 创建任务返回值。
*/
@Data
public class AddDataTaskCreateVO {
/** 任务编号。 */
private String taskId;
/** 初始状态。 */
private String status;
}

View File

@@ -0,0 +1,46 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 数据补录任务状态。
*/
@Data
public class AddDataTaskStatusVO {
/** 任务编号。 */
private String taskId;
/** 当前状态。 */
private String status;
/** 当前执行表名。 */
private String currentTableName;
/** 当前批次描述。 */
private String currentBatchInfo;
/** 成功写入数量。 */
private long insertedCount;
/** 主键重复跳过数量。 */
private long skippedCount;
/** 非主键冲突失败数量。 */
private long failedCount;
/** 失败原因。 */
private String failureReason;
/** 每小时返回给前端展示的业务时刻。 */
private List<String> hourlyTimeResults = new ArrayList<String>();
/** 任务开始时间。 */
private String startTime;
/** 任务结束时间。 */
private String endTime;
}

View File

@@ -0,0 +1,46 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 前端展示模板规则。
*/
@Data
public class AddDataTemplateVO {
/** 电能质量参数名称。 */
private String parameterName;
/** 关联表名。 */
private String tableName;
/** 前端展示相别。 */
private String phaseDisplay;
/** 实际落库相别集合。 */
private List<String> phaseCodes = new ArrayList<String>();
/** 当前模板是否展示。 */
private boolean display;
/** 是否展示是否合格列。 */
private boolean showQualified;
/** 最大值规则。 */
private String maxValueRule;
/** 最小值规则。 */
private String minValueRule;
/** 平均值规则。 */
private String averageValueRule;
/** 95% 概率大值规则。 */
private String cp95ValueRule;
/** 默认保留小数位数。 */
private int decimalScale;
}

View File

@@ -0,0 +1,36 @@
package com.njcn.gather.tool.adddata.service;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
/**
* 数据补录任务服务。
*/
public interface AddDataTaskService {
/**
* 预估补数规模。
*
* @param param 补数参数
* @return 预估结果
*/
AddDataPreviewVO preview(AddDataTaskRequestParam param);
/**
* 创建补数任务。
*
* @param param 补数参数
* @return 任务创建结果
*/
AddDataTaskCreateVO create(AddDataTaskRequestParam param);
/**
* 查询补数任务状态。
*
* @param taskId 任务编号
* @return 当前任务状态
*/
AddDataTaskStatusVO getStatus(String taskId);
}

View File

@@ -0,0 +1,18 @@
package com.njcn.gather.tool.adddata.service;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import java.util.List;
/**
* 数据补录模板服务。
*/
public interface AddDataTemplateService {
/**
* 返回前端参数模板。
*
* @return 模板列表
*/
List<AddDataTemplateVO> listTemplates();
}

View File

@@ -0,0 +1,142 @@
package com.njcn.gather.tool.adddata.service.impl;
import com.njcn.gather.tool.adddata.component.AddDataTableRegistry;
import com.njcn.gather.tool.adddata.component.AddDataTaskExecutor;
import com.njcn.gather.tool.adddata.component.AddDataTaskStatusHolder;
import com.njcn.gather.tool.adddata.component.AddDataTimeSlotCalculator;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewTableVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.service.AddDataTaskService;
import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* 数据补录任务服务实现。
*/
@Service
@RequiredArgsConstructor
public class AddDataTaskServiceImpl implements AddDataTaskService {
/** 支持的用户步长。 */
private static final Set<Integer> SUPPORTED_INTERVALS = new LinkedHashSet<Integer>(Arrays.asList(1, 3, 5, 10));
/** 表定义注册器。 */
private final AddDataTableRegistry addDataTableRegistry;
/** 时间槽计算器。 */
private final AddDataTimeSlotCalculator addDataTimeSlotCalculator;
/** 任务状态持有器。 */
private final AddDataTaskStatusHolder addDataTaskStatusHolder;
/** 后台执行器。 */
private final AddDataTaskExecutor addDataTaskExecutor;
@Override
public AddDataPreviewVO preview(AddDataTaskRequestParam param) {
AddDataTaskCommand command = buildCommand(param);
AddDataPreviewVO result = new AddDataPreviewVO();
result.setLineCount(command.getLineIds().size());
result.setIntervalMinutes(command.getIntervalMinutes());
long totalRowCount = 0L;
List<AddDataPreviewTableVO> tableStats = new ArrayList<AddDataPreviewTableVO>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
long timePointCount = addDataTimeSlotCalculator.countTimeSlots(
command.getStartTime(), command.getEndTime(), definition.resolveIntervalMinutes(command.getIntervalMinutes()));
long rowCount = timePointCount * command.getLineIds().size() * definition.getPhaseCodes().size();
AddDataPreviewTableVO tableVO = new AddDataPreviewTableVO();
tableVO.setTableName(definition.getTableName());
tableVO.setTimePointCount(timePointCount);
tableVO.setPhaseCount(definition.getPhaseCodes().size());
tableVO.setRowCount(rowCount);
tableStats.add(tableVO);
totalRowCount += rowCount;
}
result.setTableStats(tableStats);
result.setTotalRowCount(totalRowCount);
return result;
}
@Override
public AddDataTaskCreateVO create(AddDataTaskRequestParam param) {
AddDataTaskCommand command = buildCommand(param);
AddDataTaskStatusVO snapshot = addDataTaskStatusHolder.createWaitingTask(command);
addDataTaskExecutor.submit(snapshot.getTaskId(), command);
AddDataTaskCreateVO result = new AddDataTaskCreateVO();
result.setTaskId(snapshot.getTaskId());
result.setStatus(snapshot.getStatus());
return result;
}
@Override
public AddDataTaskStatusVO getStatus(String taskId) {
if (taskId == null || taskId.trim().isEmpty()) {
throw new IllegalArgumentException("任务编号不能为空");
}
return addDataTaskStatusHolder.getStatus(taskId.trim());
}
/**
* 归一化任务命令。
*
* @param param 请求参数
* @return 任务命令
*/
private AddDataTaskCommand buildCommand(AddDataTaskRequestParam param) {
if (param == null) {
throw new IllegalArgumentException("补数参数不能为空");
}
Integer intervalMinutes = param.getIntervalMinutes();
if (intervalMinutes == null || !SUPPORTED_INTERVALS.contains(intervalMinutes)) {
throw new IllegalArgumentException("时间步长仅支持 1、3、5、10 分钟");
}
List<String> lineIds = normalizeLineIds(param.getLineIds());
LocalDateTime startTime = AddDataDateTimeUtil.parse(param.getStartTime());
LocalDateTime endTime = AddDataDateTimeUtil.parse(param.getEndTime());
if (startTime.isAfter(endTime)) {
throw new IllegalArgumentException("开始时间不能大于结束时间");
}
return new AddDataTaskCommand(lineIds, startTime, endTime, intervalMinutes);
}
/**
* 标准化监测点列表。
*
* @param lineIds 原始监测点列表
* @return 去重后的监测点列表
*/
private List<String> normalizeLineIds(List<String> lineIds) {
if (lineIds == null || lineIds.isEmpty()) {
throw new IllegalArgumentException("监测点 ID 列表不能为空");
}
LinkedHashSet<String> normalized = new LinkedHashSet<String>();
for (String lineId : lineIds) {
if (lineId == null) {
throw new IllegalArgumentException("监测点 ID 不能为空");
}
String normalizedLineId = lineId.trim();
// LINEID 已改为 char(32),这里同步限制字符串长度,避免运行时写入越界。
if (normalizedLineId.isEmpty()) {
throw new IllegalArgumentException("监测点 ID 不能为空");
}
if (normalizedLineId.length() > 32) {
throw new IllegalArgumentException("监测点 ID 长度不能超过 32 位");
}
normalized.add(normalizedLineId);
}
return new ArrayList<String>(normalized);
}
}

View File

@@ -0,0 +1,25 @@
package com.njcn.gather.tool.adddata.service.impl;
import com.njcn.gather.tool.adddata.component.AddDataTemplateRegistry;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import com.njcn.gather.tool.adddata.service.AddDataTemplateService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 数据补录模板服务实现。
*/
@Service
@RequiredArgsConstructor
public class AddDataTemplateServiceImpl implements AddDataTemplateService {
/** 模板注册器。 */
private final AddDataTemplateRegistry addDataTemplateRegistry;
@Override
public List<AddDataTemplateVO> listTemplates() {
return addDataTemplateRegistry.getTemplates();
}
}

View File

@@ -0,0 +1,51 @@
package com.njcn.gather.tool.adddata.util;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
/**
* add-data 模块时间工具。
*/
public final class AddDataDateTimeUtil {
/** 统一返回格式。 */
private static final DateTimeFormatter OUTPUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/** 允许的入参时间格式。 */
private static final DateTimeFormatter[] INPUT_FORMATTERS = new DateTimeFormatter[]{
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm")
};
private AddDataDateTimeUtil() {
}
/**
* 解析请求时间。
*
* @param timeText 时间文本
* @return 解析后的时间
*/
public static LocalDateTime parse(String timeText) {
for (DateTimeFormatter formatter : INPUT_FORMATTERS) {
try {
return LocalDateTime.parse(timeText, formatter);
} catch (DateTimeParseException ignored) {
}
}
throw new IllegalArgumentException("时间格式不正确,仅支持 yyyy-MM-dd HH:mm:ss 或 yyyy-MM-dd'T'HH:mm:ss");
}
/**
* 格式化时间。
*
* @param time 时间对象
* @return 格式化结果
*/
public static String format(LocalDateTime time) {
return time == null ? null : time.format(OUTPUT_FORMATTER);
}
}

View File

@@ -0,0 +1,28 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
/**
* 表定义注册测试。
*/
class AddDataTableRegistryTest {
@Test
void shouldLoadAllThirteenTablesFromSchema() throws Exception {
AddDataTableRegistry registry = new AddDataTableRegistry();
registry.afterPropertiesSet();
List<AddDataTableDefinition> definitions = registry.getTableDefinitions();
Assertions.assertEquals(13, definitions.size());
Assertions.assertEquals("data_flicker", definitions.get(0).getTableName());
Assertions.assertEquals("data_v", definitions.get(definitions.size() - 1).getTableName());
Assertions.assertTrue(registry.getDefinition("data_v").getColumns().contains("V_THD"));
Assertions.assertEquals(4, registry.getDefinition("data_v").getPhaseCodes().size());
Assertions.assertTrue(registry.getDefinition("data_v").getPhaseCodes().contains("T"));
}
}

View File

@@ -0,0 +1,33 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.Arrays;
/**
* 补数任务状态持有器测试。
*/
class AddDataTaskStatusHolderTest {
private final AddDataTaskStatusHolder holder = new AddDataTaskStatusHolder(new AddDataTimeSlotCalculator());
@Test
void shouldReturnHourlyTimeResultsWhenCreateTask() {
AddDataTaskCommand command = new AddDataTaskCommand(
Arrays.asList("1"),
LocalDateTime.of(2026, 4, 28, 10, 7, 0),
LocalDateTime.of(2026, 4, 28, 13, 0, 0),
5);
AddDataTaskStatusVO status = holder.createWaitingTask(command);
Assertions.assertEquals(Arrays.asList(
"2026-04-28 11:00:00",
"2026-04-28 12:00:00",
"2026-04-28 13:00:00"), status.getHourlyTimeResults());
}
}

View File

@@ -0,0 +1,33 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* 模板注册测试。
*/
class AddDataTemplateRegistryTest {
@Test
void shouldOnlyExposeABCTPhaseCodes() {
AddDataTemplateRegistry registry = new AddDataTemplateRegistry();
List<AddDataTemplateVO> templates = registry.getTemplates();
Set<String> allowedPhaseCodes = new HashSet<String>();
allowedPhaseCodes.add("A");
allowedPhaseCodes.add("B");
allowedPhaseCodes.add("C");
allowedPhaseCodes.add("T");
for (AddDataTemplateVO template : templates) {
for (String phaseCode : template.getPhaseCodes()) {
Assertions.assertTrue(allowedPhaseCodes.contains(phaseCode));
}
}
}
}

View File

@@ -0,0 +1,61 @@
package com.njcn.gather.tool.adddata.component;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.List;
/**
* 时间槽计算测试。
*/
class AddDataTimeSlotCalculatorTest {
private final AddDataTimeSlotCalculator calculator = new AddDataTimeSlotCalculator();
@Test
void shouldAlignToNextNaturalSlot() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 12);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 10, 22, 0);
List<LocalDateTime> slots = calculator.buildTimeSlots(start, end, 5);
Assertions.assertEquals(3, slots.size());
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 10, 0), slots.get(0));
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 20, 0), slots.get(2));
}
@Test
void shouldReturnEmptyWhenRangeDoesNotContainAnySlot() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 0);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 10, 9, 59);
List<LocalDateTime> slots = calculator.buildTimeSlots(start, end, 10);
Assertions.assertTrue(slots.isEmpty());
}
@Test
void shouldBuildHourlySlotsFromNextNaturalHour() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 0);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 13, 0, 0);
List<LocalDateTime> slots = calculator.buildHourlyTimeSlots(start, end);
Assertions.assertEquals(3, slots.size());
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 11, 0, 0), slots.get(0));
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 13, 0, 0), slots.get(2));
}
@Test
void shouldIncludeStartWhenAlreadyAtNaturalHour() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 0, 0);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 12, 30, 0);
List<LocalDateTime> slots = calculator.buildHourlyTimeSlots(start, end);
Assertions.assertEquals(3, slots.size());
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 0, 0), slots.get(0));
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 12, 0, 0), slots.get(2));
}
}

View File

@@ -0,0 +1,32 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.List;
/**
* 数值生成器测试。
*/
class AddDataValueGeneratorTest {
@Test
void shouldGenerateStableDataVRowWithExpectedColumnCount() throws Exception {
AddDataTableRegistry registry = new AddDataTableRegistry();
registry.afterPropertiesSet();
AddDataTableDefinition definition = registry.getDefinition("data_v");
AddDataValueGenerator generator = new AddDataValueGenerator();
List<Object> row = generator.generateRow(definition, "f04a9d62e3d24e6580e4f32b40967505", LocalDateTime.of(2026, 4, 28, 10, 10, 0), "A");
Assertions.assertEquals(definition.getColumns().size(), row.size());
Assertions.assertEquals("A", row.get(definition.getColumns().indexOf("PHASIC_TYPE")));
Double rms = (Double) row.get(definition.getColumns().indexOf("RMS"));
Double rmsMax = (Double) row.get(definition.getColumns().indexOf("RMS_MAX"));
Double rmsMin = (Double) row.get(definition.getColumns().indexOf("RMS_MIN"));
Assertions.assertTrue(rmsMax >= rms);
Assertions.assertTrue(rmsMin <= rms);
}
}

View File

@@ -21,6 +21,7 @@
<module>activate-tool</module>
<module>mms-mapping</module>
<module>wave-tool</module>
<module>add-data</module>
</modules>
</project>