feat(tools): 新增数据补录功能模块

- 实现批量写入组件,支持 INSERT IGNORE 操作和降级逐行写入
- 添加表定义注册器,自动解析 SQL 元数据并注册 13 张补数表
- 集成异步任务执行器,支持后台补数任务提交和状态管理
- 创建数据补录控制器,提供预估、创建和查询任务状态接口
- 实现时间槽计算和数据生成逻辑,支持多相别数据补录
- 添加线程池配置和任务状态持有器,确保任务并发执行安全
- 创建预览功能,估算补数任务的规模和影响范围
This commit is contained in:
2026-04-30 09:01:43 +08:00
parent 04e1dc2659
commit 6258c2dda5
34 changed files with 2614 additions and 0 deletions

41
tools/add-data/pom.xml Normal file
View File

@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.njcn.gather</groupId>
<artifactId>tools</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>add-data</artifactId>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>njcn-common</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>spingboot2.3.12</artifactId>
<version>2.3.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,135 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 批量写入组件。
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AddDataBatchWriter {
/** JDBC 模板。 */
private final JdbcTemplate jdbcTemplate;
/**
* 写入一个批次。
*
* @param definition 表定义
* @param rows 行数据
* @return 写入结果
*/
public AddDataBatchWriteResult writeBatch(AddDataTableDefinition definition, List<List<Object>> rows) {
if (rows == null || rows.isEmpty()) {
return new AddDataBatchWriteResult(0L, 0L, 0L, null);
}
try {
return executeInsertIgnore(definition, rows);
} catch (DataAccessException ex) {
log.warn("批量写入失败开始降级为逐行写入table={}, batchSize={}, message={}",
definition.getTableName(), rows.size(), resolveErrorMessage(ex));
return fallbackWriteOneByOne(definition, rows, ex);
}
}
/**
* 执行批量 INSERT IGNORE。
*
* @param definition 表定义
* @param rows 行数据
* @return 写入结果
*/
private AddDataBatchWriteResult executeInsertIgnore(AddDataTableDefinition definition, List<List<Object>> rows) {
String sql = buildInsertIgnoreSql(definition, rows.size());
List<Object> args = flattenRows(rows);
int affectedRows = jdbcTemplate.update(sql, args.toArray(new Object[0]));
long insertedCount = Math.max(affectedRows, 0);
long skippedCount = rows.size() - insertedCount;
return new AddDataBatchWriteResult(insertedCount, skippedCount, 0L, null);
}
/**
* 批量写入失败后逐行降级。
*
* @param definition 表定义
* @param rows 行数据
* @param batchException 原始批量异常
* @return 写入结果
*/
private AddDataBatchWriteResult fallbackWriteOneByOne(AddDataTableDefinition definition, List<List<Object>> rows, DataAccessException batchException) {
long insertedCount = 0L;
long skippedCount = 0L;
long failedCount = 0L;
String firstFailureMessage = null;
String sql = buildInsertIgnoreSql(definition, 1);
for (List<Object> row : rows) {
try {
int affectedRows = jdbcTemplate.update(sql, row.toArray(new Object[0]));
if (affectedRows > 0) {
insertedCount += affectedRows;
} else {
skippedCount++;
}
} catch (DataAccessException ex) {
failedCount++;
if (firstFailureMessage == null) {
firstFailureMessage = resolveErrorMessage(ex);
}
}
}
if (firstFailureMessage == null) {
firstFailureMessage = resolveErrorMessage(batchException);
}
return new AddDataBatchWriteResult(insertedCount, skippedCount, failedCount, firstFailureMessage);
}
/**
* 构建 INSERT IGNORE SQL。
*
* @param definition 表定义
* @param rowCount 行数
* @return SQL 文本
*/
private String buildInsertIgnoreSql(AddDataTableDefinition definition, int rowCount) {
String columnSegment = "`" + String.join("`,`", definition.getColumns()) + "`";
String placeholderSegment = "(" + String.join(",", Collections.nCopies(definition.getColumns().size(), "?")) + ")";
String valuesSegment = String.join(",", Collections.nCopies(rowCount, placeholderSegment));
return "INSERT IGNORE INTO `" + definition.getTableName() + "` (" + columnSegment + ") VALUES " + valuesSegment;
}
/**
* 展平批量参数。
*
* @param rows 行数据
* @return 扁平参数列表
*/
private List<Object> flattenRows(List<List<Object>> rows) {
List<Object> args = new ArrayList<Object>();
for (List<Object> row : rows) {
args.addAll(row);
}
return args;
}
/**
* 提取错误信息。
*
* @param ex 异常
* @return 错误信息
*/
private String resolveErrorMessage(DataAccessException ex) {
Throwable root = ex.getMostSpecificCause();
return root == null ? ex.getMessage() : root.getMessage();
}
}

View File

@@ -0,0 +1,152 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 数据补录表定义注册器。
*/
@Slf4j
@Component
public class AddDataTableRegistry implements InitializingBean {
/** 元数据 SQL 资源。 */
private static final String SCHEMA_RESOURCE_PATH = "sql/add-data/DATA_FLICKER.sql";
/** 建表语句解析表达式。 */
private static final Pattern TABLE_PATTERN = Pattern.compile("CREATE TABLE\\s+`([^`]+)`\\s*\\((.*?)\\)\\s*ENGINE",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
/** 列定义解析表达式。 */
private static final Pattern COLUMN_PATTERN = Pattern.compile("^\\s*`([^`]+)`", Pattern.MULTILINE);
/** 首版统一按 A/B/C/T 四类相别落库。 */
private static final List<String> WRITE_PHASE_CODES = Collections.unmodifiableList(Arrays.asList("A", "B", "C", "T"));
/** 宽表批次大小。 */
private static final int WIDE_TABLE_BATCH_SIZE = 100;
/** 窄表批次大小。 */
private static final int NARROW_TABLE_BATCH_SIZE = 500;
/** 表定义列表。 */
private List<AddDataTableDefinition> tableDefinitions = Collections.emptyList();
@Override
public void afterPropertiesSet() throws Exception {
Map<String, List<String>> columnMap = parseSchemaColumns();
List<AddDataTableDefinition> definitions = new ArrayList<AddDataTableDefinition>();
definitions.add(buildDefinition(columnMap, "data_flicker", AddDataTableDefinition.TimeAxisType.FIXED_TEN_MINUTES, NARROW_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_fluc", AddDataTableDefinition.TimeAxisType.FIXED_TEN_MINUTES, NARROW_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmphasic_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmphasic_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmpower_p", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmpower_q", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmpower_s", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmrate_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_harmrate_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_inharm_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_plt", AddDataTableDefinition.TimeAxisType.FIXED_TWO_HOURS, NARROW_TABLE_BATCH_SIZE));
definitions.add(buildDefinition(columnMap, "data_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE));
tableDefinitions = Collections.unmodifiableList(definitions);
log.info("加载 add-data 表定义完成tableCount={}", tableDefinitions.size());
}
/**
* 返回全部表定义。
*
* @return 表定义列表
*/
public List<AddDataTableDefinition> getTableDefinitions() {
return tableDefinitions;
}
/**
* 根据表名查找表定义。
*
* @param tableName 表名
* @return 表定义
*/
public AddDataTableDefinition getDefinition(String tableName) {
for (AddDataTableDefinition definition : tableDefinitions) {
if (definition.getTableName().equals(tableName)) {
return definition;
}
}
throw new IllegalArgumentException("未找到 add-data 表定义:" + tableName);
}
/**
* 解析 schema SQL 中的列定义。
*
* @return 表名与字段列表映射
*/
private Map<String, List<String>> parseSchemaColumns() {
String schemaText = loadSchemaText();
Matcher tableMatcher = TABLE_PATTERN.matcher(schemaText);
Map<String, List<String>> columnMap = new LinkedHashMap<String, List<String>>();
while (tableMatcher.find()) {
String tableName = tableMatcher.group(1);
String tableBody = tableMatcher.group(2);
Matcher columnMatcher = COLUMN_PATTERN.matcher(tableBody);
List<String> columns = new ArrayList<String>();
while (columnMatcher.find()) {
columns.add(columnMatcher.group(1));
}
columnMap.put(tableName, columns);
}
return columnMap;
}
/**
* 创建单表定义。
*
* @param columnMap 字段映射
* @param tableName 表名
* @param timeAxisType 时间轴类型
* @param batchSize 批次大小
* @return 表定义
*/
private AddDataTableDefinition buildDefinition(Map<String, List<String>> columnMap, String tableName,
AddDataTableDefinition.TimeAxisType timeAxisType, int batchSize) {
List<String> columns = columnMap.get(tableName);
if (columns == null || columns.isEmpty()) {
throw new IllegalStateException("未从 SQL 元数据中解析到表字段:" + tableName);
}
/*
* 当前按用户最新确认13 张表统一支持 A/B/C/T 四类数据类型。
* 如果后续补齐逐表更细的相别映射,只需要在这里调整落库相别集合即可。
*/
return new AddDataTableDefinition(tableName, columns, WRITE_PHASE_CODES, batchSize, timeAxisType);
}
/**
* 加载 schema SQL 文本。
*
* @return SQL 文本
*/
private String loadSchemaText() {
ClassPathResource resource = new ClassPathResource(SCHEMA_RESOURCE_PATH);
try {
return StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("读取 add-data SQL 元数据失败:" + SCHEMA_RESOURCE_PATH, e);
}
}
}

View File

@@ -0,0 +1,310 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
/**
* 异步任务执行器。
*/
@Slf4j
@Component
public class AddDataTaskExecutor {
/** 多个时间点组成一个处理窗口,兼顾批量效率和内存占用。 */
private static final int TIME_WINDOW_SIZE = 30;
/** 任务线程池。 */
@Qualifier("addDataTaskExecutorService")
private final ExecutorService addDataTaskExecutorService;
/** 表定义注册器。 */
private final AddDataTableRegistry addDataTableRegistry;
/** 时间槽计算器。 */
private final AddDataTimeSlotCalculator addDataTimeSlotCalculator;
/** 数据生成器。 */
private final AddDataValueGenerator addDataValueGenerator;
/** 批量写入组件。 */
private final AddDataBatchWriter addDataBatchWriter;
/** 状态持有器。 */
private final AddDataTaskStatusHolder addDataTaskStatusHolder;
public AddDataTaskExecutor(@Qualifier("addDataTaskExecutorService") ExecutorService addDataTaskExecutorService,
AddDataTableRegistry addDataTableRegistry,
AddDataTimeSlotCalculator addDataTimeSlotCalculator,
AddDataValueGenerator addDataValueGenerator,
AddDataBatchWriter addDataBatchWriter,
AddDataTaskStatusHolder addDataTaskStatusHolder) {
this.addDataTaskExecutorService = addDataTaskExecutorService;
this.addDataTableRegistry = addDataTableRegistry;
this.addDataTimeSlotCalculator = addDataTimeSlotCalculator;
this.addDataValueGenerator = addDataValueGenerator;
this.addDataBatchWriter = addDataBatchWriter;
this.addDataTaskStatusHolder = addDataTaskStatusHolder;
}
/**
* 提交后台任务。
*
* @param taskId 任务编号
* @param command 任务命令
*/
public void submit(String taskId, AddDataTaskCommand command) {
addDataTaskExecutorService.submit(() -> execute(taskId, command));
}
/**
* 执行补数任务。
*
* @param taskId 任务编号
* @param command 任务命令
*/
private void execute(String taskId, AddDataTaskCommand command) {
try {
addDataTaskStatusHolder.markRunning(taskId);
Map<String, List<LocalDateTime>> timeSlotsByTable = buildTimeSlotsByTable(command);
Map<String, Set<LocalDateTime>> timeSlotLookupByTable = buildTimeSlotLookupByTable(timeSlotsByTable);
List<LocalDateTime> mergedTimeSlots = mergeTimeSlots(timeSlotsByTable);
Map<String, Integer> batchNoMap = new HashMap<String, Integer>();
Map<String, List<List<Object>>> pendingRowsByTable = buildPendingRowsByTable();
for (int startIndex = 0; startIndex < mergedTimeSlots.size(); startIndex += TIME_WINDOW_SIZE) {
int endIndex = Math.min(startIndex + TIME_WINDOW_SIZE, mergedTimeSlots.size());
List<LocalDateTime> timeWindow = mergedTimeSlots.subList(startIndex, endIndex);
GeneratedBatchData generatedBatchData = generateBatchData(command, timeWindow, timeSlotLookupByTable);
if (!writeBatchData(taskId, generatedBatchData, pendingRowsByTable, batchNoMap)) {
return;
}
}
if (!flushRemainingBatchData(taskId, pendingRowsByTable, batchNoMap)) {
return;
}
addDataTaskStatusHolder.markSuccess(taskId);
} catch (Exception ex) {
log.error("执行补数任务失败taskId={}", taskId, ex);
addDataTaskStatusHolder.markFailed(taskId, ex.getMessage());
}
}
/**
* 为每张表构建各自命中的时间槽。
*
* @param command 任务命令
* @return 按表名分组的时间槽
*/
private Map<String, List<LocalDateTime>> buildTimeSlotsByTable(AddDataTaskCommand command) {
Map<String, List<LocalDateTime>> result = new LinkedHashMap<String, List<LocalDateTime>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
int intervalMinutes = definition.resolveIntervalMinutes(command.getIntervalMinutes());
List<LocalDateTime> timeSlots = addDataTimeSlotCalculator.buildTimeSlots(
command.getStartTime(), command.getEndTime(), intervalMinutes);
result.put(definition.getTableName(), timeSlots);
}
return result;
}
/**
* 为每张表补充时间槽查找索引,避免窗口生成时重复线性扫描。
*
* @param timeSlotsByTable 按表名分组的时间槽
* @return 按表名分组的时间槽查找集合
*/
private Map<String, Set<LocalDateTime>> buildTimeSlotLookupByTable(Map<String, List<LocalDateTime>> timeSlotsByTable) {
Map<String, Set<LocalDateTime>> result = new LinkedHashMap<String, Set<LocalDateTime>>();
for (Map.Entry<String, List<LocalDateTime>> entry : timeSlotsByTable.entrySet()) {
result.put(entry.getKey(), new HashSet<LocalDateTime>(entry.getValue()));
}
return result;
}
/**
* 为每张表初始化跨窗口复用的待写入缓存。
*
* @return 按表名分组的待写入缓存
*/
private Map<String, List<List<Object>>> buildPendingRowsByTable() {
Map<String, List<List<Object>>> result = new LinkedHashMap<String, List<List<Object>>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
result.put(definition.getTableName(), new ArrayList<List<Object>>());
}
return result;
}
/**
* 合并所有表的时间槽,得到统一的自然时间轴。
*
* @param timeSlotsByTable 按表名分组的时间槽
* @return 去重后的统一时间轴
*/
private List<LocalDateTime> mergeTimeSlots(Map<String, List<LocalDateTime>> timeSlotsByTable) {
TreeSet<LocalDateTime> merged = new TreeSet<LocalDateTime>();
for (List<LocalDateTime> timeSlots : timeSlotsByTable.values()) {
merged.addAll(timeSlots);
}
return new ArrayList<LocalDateTime>(merged);
}
/**
* 按多个时间点窗口生成待写入数据,不直接写库。
*
* @param command 任务命令
* @param timeWindow 当前时间窗口
* @param timeSlotLookupByTable 各表命中的时间槽索引
* @return 当前窗口生成结果
*/
private GeneratedBatchData generateBatchData(AddDataTaskCommand command, List<LocalDateTime> timeWindow,
Map<String, Set<LocalDateTime>> timeSlotLookupByTable) {
Map<String, List<List<Object>>> rowsByTable = new LinkedHashMap<String, List<List<Object>>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
rowsByTable.put(definition.getTableName(), new ArrayList<List<Object>>());
}
for (LocalDateTime timeSlot : timeWindow) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
Set<LocalDateTime> tableTimeSlots = timeSlotLookupByTable.get(definition.getTableName());
if (!tableTimeSlots.contains(timeSlot)) {
continue;
}
List<List<Object>> rows = rowsByTable.get(definition.getTableName());
for (String lineId : command.getLineIds()) {
for (String phaseCode : definition.getPhaseCodes()) {
rows.add(addDataValueGenerator.generateRow(definition, lineId, timeSlot, phaseCode));
}
}
}
}
return new GeneratedBatchData(rowsByTable);
}
/**
* 写入当前窗口已经生成的数据。
*
* @param taskId 任务编号
* @param generatedBatchData 当前窗口生成结果
* @param pendingRowsByTable 跨窗口复用的待写入缓存
* @param batchNoMap 每张表的批次号缓存
* @return true 表示继续执行
*/
private boolean writeBatchData(String taskId, GeneratedBatchData generatedBatchData,
Map<String, List<List<Object>>> pendingRowsByTable, Map<String, Integer> batchNoMap) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
List<List<Object>> rows = generatedBatchData.getRows(definition.getTableName());
if (rows.isEmpty()) {
continue;
}
List<List<Object>> pendingRows = pendingRowsByTable.get(definition.getTableName());
pendingRows.addAll(rows);
int batchSize = definition.getBatchSize();
while (pendingRows.size() >= batchSize) {
List<List<Object>> batchRows = new ArrayList<List<Object>>(pendingRows.subList(0, batchSize));
int batchNo = nextBatchNo(batchNoMap, definition.getTableName());
if (!flushBatch(taskId, definition, batchRows, batchNo)) {
return false;
}
pendingRows.subList(0, batchSize).clear();
}
}
return true;
}
/**
* 任务结束后刷新所有未满批次的尾批数据。
*
* @param taskId 任务编号
* @param pendingRowsByTable 跨窗口复用的待写入缓存
* @param batchNoMap 每张表的批次号缓存
* @return true 表示继续执行
*/
private boolean flushRemainingBatchData(String taskId, Map<String, List<List<Object>>> pendingRowsByTable,
Map<String, Integer> batchNoMap) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
List<List<Object>> pendingRows = pendingRowsByTable.get(definition.getTableName());
if (pendingRows == null || pendingRows.isEmpty()) {
continue;
}
int batchNo = nextBatchNo(batchNoMap, definition.getTableName());
List<List<Object>> batchRows = new ArrayList<List<Object>>(pendingRows);
if (!flushBatch(taskId, definition, batchRows, batchNo)) {
return false;
}
pendingRows.clear();
}
return true;
}
/**
* 获取当前表的下一个批次号。
*
* @param batchNoMap 批次号缓存
* @param tableName 表名
* @return 下一个批次号
*/
private int nextBatchNo(Map<String, Integer> batchNoMap, String tableName) {
Integer currentBatchNo = batchNoMap.get(tableName);
int nextBatchNo = currentBatchNo == null ? 1 : currentBatchNo + 1;
batchNoMap.put(tableName, nextBatchNo);
return nextBatchNo;
}
/**
* 刷新当前批次。
*
* @param taskId 任务编号
* @param definition 表定义
* @param batchRows 批次行
* @param batchNo 批次号
* @return true 表示继续执行
*/
private boolean flushBatch(String taskId, AddDataTableDefinition definition, List<List<Object>> batchRows, int batchNo) {
addDataTaskStatusHolder.updateCurrentBatch(taskId, definition.getTableName(), batchNo, batchRows.size());
AddDataBatchWriteResult writeResult = addDataBatchWriter.writeBatch(definition, batchRows);
addDataTaskStatusHolder.addProgress(taskId, writeResult.getInsertedCount(), writeResult.getSkippedCount(), writeResult.getFailedCount());
if (writeResult.hasFailure()) {
addDataTaskStatusHolder.markFailed(taskId,
"" + definition.getTableName() + "" + batchNo + " 批执行失败:" + writeResult.getFirstFailureMessage());
return false;
}
return true;
}
/**
* 当前窗口生成结果。
*/
private static final class GeneratedBatchData {
/** 按表名分组的待写入行数据。 */
private final Map<String, List<List<Object>>> rowsByTable;
private GeneratedBatchData(Map<String, List<List<Object>>> rowsByTable) {
this.rowsByTable = rowsByTable;
}
/**
* 获取指定表的待写入行。
*
* @param tableName 表名
* @return 待写入行列表
*/
private List<List<Object>> getRows(String tableName) {
List<List<Object>> rows = rowsByTable.get(tableName);
return rows == null ? Collections.emptyList() : rows;
}
}
}

View File

@@ -0,0 +1,240 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import com.njcn.gather.tool.adddata.pojo.enums.AddDataTaskStatusEnum;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 任务状态持有器。
*/
@Component
public class AddDataTaskStatusHolder {
/** 任务记录缓存。 */
private final ConcurrentMap<String, TaskRecord> taskRecordMap = new ConcurrentHashMap<String, TaskRecord>();
/** 时间槽计算组件。 */
private final AddDataTimeSlotCalculator addDataTimeSlotCalculator;
public AddDataTaskStatusHolder(AddDataTimeSlotCalculator addDataTimeSlotCalculator) {
this.addDataTimeSlotCalculator = addDataTimeSlotCalculator;
}
/**
* 创建等待中的任务。
*
* @param command 任务命令
* @return 任务状态快照
*/
public AddDataTaskStatusVO createWaitingTask(AddDataTaskCommand command) {
String taskId = UUID.randomUUID().toString().replace("-", "");
TaskRecord record = new TaskRecord(taskId, command, buildHourlyTimeResults(command));
taskRecordMap.put(taskId, record);
return copySnapshot(record);
}
/**
* 标记为运行中。
*
* @param taskId 任务编号
*/
public void markRunning(String taskId) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.status = AddDataTaskStatusEnum.RUNNING;
record.startTime = LocalDateTime.now();
record.currentBatchInfo = "任务已启动";
}
}
/**
* 更新当前批次信息。
*
* @param taskId 任务编号
* @param tableName 当前表名
* @param batchNo 批次号
* @param batchSize 批次大小
*/
public void updateCurrentBatch(String taskId, String tableName, int batchNo, int batchSize) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.currentTableName = tableName;
record.currentBatchInfo = "" + batchNo + " 批," + batchSize + "";
}
}
/**
* 累加执行统计。
*
* @param taskId 任务编号
* @param insertedCount 新增成功数
* @param skippedCount 跳过数
* @param failedCount 失败数
*/
public void addProgress(String taskId, long insertedCount, long skippedCount, long failedCount) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.insertedCount += insertedCount;
record.skippedCount += skippedCount;
record.failedCount += failedCount;
}
}
/**
* 标记任务成功。
*
* @param taskId 任务编号
*/
public void markSuccess(String taskId) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.status = AddDataTaskStatusEnum.SUCCESS;
record.endTime = LocalDateTime.now();
record.currentBatchInfo = "执行完成";
}
}
/**
* 标记任务失败。
*
* @param taskId 任务编号
* @param failureReason 失败原因
*/
public void markFailed(String taskId, String failureReason) {
TaskRecord record = requireRecord(taskId);
synchronized (record) {
record.status = AddDataTaskStatusEnum.FAILED;
record.endTime = LocalDateTime.now();
record.failureReason = failureReason;
record.currentBatchInfo = "执行失败";
}
}
/**
* 查询任务状态。
*
* @param taskId 任务编号
* @return 状态快照
*/
public AddDataTaskStatusVO getStatus(String taskId) {
return copySnapshot(requireRecord(taskId));
}
/**
* 获取任务命令。
*
* @param taskId 任务编号
* @return 任务命令
*/
public AddDataTaskCommand getCommand(String taskId) {
return requireRecord(taskId).command;
}
/**
* 获取全部任务编号。
*
* @return 任务编号列表
*/
public List<String> getTaskIds() {
return new ArrayList<String>(taskRecordMap.keySet());
}
/**
* 复制状态快照。
*
* @param record 任务记录
* @return 状态快照
*/
private AddDataTaskStatusVO copySnapshot(TaskRecord record) {
synchronized (record) {
AddDataTaskStatusVO status = new AddDataTaskStatusVO();
status.setTaskId(record.taskId);
status.setStatus(record.status.name());
status.setCurrentTableName(record.currentTableName);
status.setCurrentBatchInfo(record.currentBatchInfo);
status.setInsertedCount(record.insertedCount);
status.setSkippedCount(record.skippedCount);
status.setFailedCount(record.failedCount);
status.setFailureReason(record.failureReason);
status.setHourlyTimeResults(new ArrayList<String>(record.hourlyTimeResults));
status.setStartTime(AddDataDateTimeUtil.format(record.startTime));
status.setEndTime(AddDataDateTimeUtil.format(record.endTime));
return status;
}
}
/**
* 根据任务时间范围生成前端展示的每小时时刻。
*
* @param command 任务命令
* @return 每小时时刻文本
*/
private List<String> buildHourlyTimeResults(AddDataTaskCommand command) {
List<LocalDateTime> timeSlots = addDataTimeSlotCalculator.buildHourlyTimeSlots(command.getStartTime(), command.getEndTime());
List<String> result = new ArrayList<String>(timeSlots.size());
for (LocalDateTime timeSlot : timeSlots) {
result.add(AddDataDateTimeUtil.format(timeSlot));
}
return result;
}
/**
* 获取任务记录。
*
* @param taskId 任务编号
* @return 任务记录
*/
private TaskRecord requireRecord(String taskId) {
TaskRecord record = taskRecordMap.get(taskId);
if (record == null) {
throw new IllegalArgumentException("未找到补数任务:" + taskId);
}
return record;
}
/**
* 内存态任务记录。
*/
private static final class TaskRecord {
/** 任务编号。 */
private final String taskId;
/** 任务命令。 */
private final AddDataTaskCommand command;
/** 每小时返回给前端展示的业务时刻。 */
private final List<String> hourlyTimeResults;
/** 当前状态。 */
private AddDataTaskStatusEnum status = AddDataTaskStatusEnum.WAITING;
/** 当前表名。 */
private String currentTableName;
/** 当前批次。 */
private String currentBatchInfo = "等待执行";
/** 成功数。 */
private long insertedCount;
/** 跳过数。 */
private long skippedCount;
/** 失败数。 */
private long failedCount;
/** 失败原因。 */
private String failureReason;
/** 开始时间。 */
private LocalDateTime startTime;
/** 结束时间。 */
private LocalDateTime endTime;
private TaskRecord(String taskId, AddDataTaskCommand command, List<String> hourlyTimeResults) {
this.taskId = taskId;
this.command = command;
this.hourlyTimeResults = hourlyTimeResults;
}
}
}

View File

@@ -0,0 +1,97 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* 前端展示模板注册器。
*/
@Component
public class AddDataTemplateRegistry {
/** 模板列表。 */
private final List<AddDataTemplateVO> templates;
public AddDataTemplateRegistry() {
List<AddDataTemplateVO> result = new ArrayList<AddDataTemplateVO>();
result.add(createTemplate("电压有效值", "data_v", "ABC", Arrays.asList("A", "B", "C"),
"RMS_MAX", "RMS_MIN", "主值字段 RMS", "RMS_CP95"));
result.add(createTemplate("线电压有效值", "data_v", "T", Arrays.asList("T"),
"RMSAB/RMSBC/RMSCA 对应 MAX 字段", "RMSAB/RMSBC/RMSCA 对应 MIN 字段", "主值字段 RMSAB/RMSBC/RMSCA", "RMSAB/RMSBC/RMSCA 对应 CP95 字段"));
result.add(createTemplate("频率", "data_v", "T", Arrays.asList("T"),
"FREQ_MAX", "FREQ_MIN", "主值字段 FREQ", "FREQ_CP95"));
result.add(createTemplate("电压总谐波畸变率", "data_v", "ABC", Arrays.asList("A", "B", "C"),
"V_THD_MAX", "V_THD_MIN", "主值字段 V_THD", "V_THD_CP95"));
result.add(createTemplate("电流有效值", "data_i", "ABC", Arrays.asList("A", "B", "C"),
"RMS_MAX", "RMS_MIN", "主值字段 RMS", "RMS_CP95"));
result.add(createTemplate("电流总谐波畸变率", "data_i", "ABC", Arrays.asList("A", "B", "C"),
"I_THD_MAX", "I_THD_MIN", "主值字段 I_THD", "I_THD_CP95"));
result.add(createTemplate("电压谐波幅值", "data_harmphasic_v", "ABC", Arrays.asList("A", "B", "C"),
"各次 V_n_MAX", "各次 V_n_MIN", "主值字段 V_n", "各次 V_n_CP95"));
result.add(createTemplate("电流谐波幅值", "data_harmphasic_i", "ABC", Arrays.asList("A", "B", "C"),
"各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95"));
result.add(createTemplate("电压谐波含有率", "data_harmrate_v", "ABC", Arrays.asList("A", "B", "C"),
"各次 V_n_MAX", "各次 V_n_MIN", "主值字段 V_n", "各次 V_n_CP95"));
result.add(createTemplate("电流谐波含有率", "data_harmrate_i", "ABC", Arrays.asList("A", "B", "C"),
"各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95"));
result.add(createTemplate("间谐波电流", "data_inharm_i", "ABC", Arrays.asList("A", "B", "C"),
"各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95"));
result.add(createTemplate("有功谐波功率", "data_harmpower_p", "ABC", Arrays.asList("A", "B", "C"),
"P_MAX 与各次 P_n_MAX", "P_MIN 与各次 P_n_MIN", "主值字段 P / P_n", "P_CP95 与各次 P_n_CP95"));
result.add(createTemplate("无功谐波功率", "data_harmpower_q", "ABC", Arrays.asList("A", "B", "C"),
"Q_MAX 与各次 Q_n_MAX", "Q_MIN 与各次 Q_n_MIN", "主值字段 Q / Q_n", "Q_CP95 与各次 Q_n_CP95"));
result.add(createTemplate("视在谐波功率", "data_harmpower_s", "ABC", Arrays.asList("A", "B", "C"),
"S_MAX 与各次 S_n_MAX", "S_MIN 与各次 S_n_MIN", "主值字段 S / S_n", "S_CP95 与各次 S_n_CP95"));
result.add(createTemplate("电压波动", "data_fluc", "T", Arrays.asList("T"),
"当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 FLUC", "当前表无独立 CP95 字段"));
result.add(createTemplate("短时闪变", "data_flicker", "T", Arrays.asList("T"),
"当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 PST", "当前表无独立 CP95 字段"));
result.add(createTemplate("长时闪变", "data_plt", "T", Arrays.asList("T"),
"当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 PLT", "当前表无独立 CP95 字段"));
templates = Collections.unmodifiableList(result);
}
/**
* 返回模板列表。
*
* @return 模板列表
*/
public List<AddDataTemplateVO> getTemplates() {
return templates;
}
/**
* 创建模板对象。
*
* @param parameterName 参数名称
* @param tableName 表名
* @param phaseDisplay 展示相别
* @param phaseCodes 落库相别
* @param maxRule 最大值规则
* @param minRule 最小值规则
* @param averageRule 平均值规则
* @param cp95Rule cp95 规则
* @return 模板对象
*/
private AddDataTemplateVO createTemplate(String parameterName, String tableName, String phaseDisplay, List<String> phaseCodes,
String maxRule, String minRule, String averageRule, String cp95Rule) {
AddDataTemplateVO template = new AddDataTemplateVO();
template.setParameterName(parameterName);
template.setTableName(tableName);
template.setPhaseDisplay(phaseDisplay);
template.setPhaseCodes(new ArrayList<String>(phaseCodes));
template.setDisplay(true);
template.setShowQualified(true);
template.setMaxValueRule(maxRule);
template.setMinValueRule(minRule);
template.setAverageValueRule(averageRule);
template.setCp95ValueRule(cp95Rule);
template.setDecimalScale(2);
return template;
}
}

View File

@@ -0,0 +1,77 @@
package com.njcn.gather.tool.adddata.component;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
/**
* 时间槽计算组件。
*/
@Component
public class AddDataTimeSlotCalculator {
/**
* 构建自然时间槽列表。
*
* @param startTime 开始时间
* @param endTime 结束时间
* @param intervalMinutes 步长,单位分钟
* @return 时间槽列表
*/
public List<LocalDateTime> buildTimeSlots(LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) {
if (intervalMinutes <= 0) {
throw new IllegalArgumentException("时间步长必须大于 0");
}
List<LocalDateTime> result = new ArrayList<LocalDateTime>();
LocalDateTime cursor = alignToNextSlot(startTime, intervalMinutes);
while (!cursor.isAfter(endTime)) {
result.add(cursor);
cursor = cursor.plusMinutes(intervalMinutes);
}
return result;
}
/**
* 构建返回给前端展示的每小时自然时刻列表。
*
* @param startTime 开始时间
* @param endTime 结束时间
* @return 每小时自然时刻列表
*/
public List<LocalDateTime> buildHourlyTimeSlots(LocalDateTime startTime, LocalDateTime endTime) {
return buildTimeSlots(startTime, endTime, 60);
}
/**
* 计算时间槽数量。
*
* @param startTime 开始时间
* @param endTime 结束时间
* @param intervalMinutes 步长
* @return 时间槽数量
*/
public long countTimeSlots(LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) {
return buildTimeSlots(startTime, endTime, intervalMinutes).size();
}
/**
* 向上对齐到最近的自然时间槽。
*
* @param time 原始时间
* @param intervalMinutes 步长
* @return 对齐后的时间
*/
public LocalDateTime alignToNextSlot(LocalDateTime time, int intervalMinutes) {
LocalDateTime minuteFloor = time.truncatedTo(ChronoUnit.MINUTES);
int minuteOfDay = minuteFloor.getHour() * 60 + minuteFloor.getMinute();
int remainder = minuteOfDay % intervalMinutes;
LocalDateTime aligned = minuteFloor.minusMinutes(remainder);
if (remainder != 0 || !minuteFloor.equals(time)) {
aligned = aligned.plusMinutes(intervalMinutes);
}
return aligned;
}
}

View File

@@ -0,0 +1,579 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SplittableRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 电能质量数据生成器。
*/
@Component
public class AddDataValueGenerator {
/** 谐波列匹配。 */
private static final Pattern HARMONIC_PATTERN = Pattern.compile("^([VIQPS])_(\\d+)$");
/** 基础列后缀。 */
private static final String SUFFIX_MAX = "_MAX";
private static final String SUFFIX_MIN = "_MIN";
private static final String SUFFIX_CP95 = "_CP95";
/** 两倍 PI。 */
private static final double TWO_PI = Math.PI * 2D;
/**
* 生成一行落库值。
*
* @param definition 表定义
* @param lineId 监测点 ID
* @param timeId 时间点
* @param phaseCode 相别
* @return 与字段顺序一致的值列表
*/
public List<Object> generateRow(AddDataTableDefinition definition, String lineId, LocalDateTime timeId, String phaseCode) {
MeasurementState state = buildState(lineId, timeId, phaseCode);
Map<String, Double> baseValues = buildBaseValues(definition, state);
List<Object> row = new ArrayList<Object>(definition.getColumns().size());
for (String column : definition.getColumns()) {
if ("TIMEID".equals(column)) {
row.add(Timestamp.valueOf(timeId));
continue;
}
if ("LINEID".equals(column)) {
row.add(lineId);
continue;
}
if ("PHASIC_TYPE".equals(column)) {
row.add(phaseCode);
continue;
}
if ("QUALITYFLAG".equals(column)) {
row.add(1);
continue;
}
row.add(resolveColumnValue(definition.getTableName(), column, baseValues, state));
}
return row;
}
/**
* 按列名解析字段值。
*
* @param tableName 表名
* @param column 列名
* @param baseValues 主值集合
* @param state 基础状态
* @return 对应值
*/
private Double resolveColumnValue(String tableName, String column, Map<String, Double> baseValues, MeasurementState state) {
if (baseValues.containsKey(column)) {
return baseValues.get(column);
}
if (column.endsWith(SUFFIX_MAX)) {
return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_MAX)), column, state, MetricType.MAX);
}
if (column.endsWith(SUFFIX_MIN)) {
return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_MIN)), column, state, MetricType.MIN);
}
if (column.endsWith(SUFFIX_CP95)) {
return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_CP95)), column, state, MetricType.CP95);
}
throw new IllegalStateException("未支持的 add-data 列:" + tableName + "." + column);
}
/**
* 生成主值列集合。
*
* @param definition 表定义
* @param state 基础状态
* @return 主值集合
*/
private Map<String, Double> buildBaseValues(AddDataTableDefinition definition, MeasurementState state) {
Map<String, Double> values = new HashMap<String, Double>();
for (String column : definition.getColumns()) {
if (isInfrastructureColumn(column) || isDerivedColumn(column)) {
continue;
}
values.put(column, resolveBaseMetric(definition.getTableName(), column, state));
}
return values;
}
/**
* 解析基础主值。
*
* @param tableName 表名
* @param column 列名
* @param state 基础状态
* @return 主值
*/
private Double resolveBaseMetric(String tableName, String column, MeasurementState state) {
Matcher matcher = HARMONIC_PATTERN.matcher(column);
if (matcher.matches()) {
String prefix = matcher.group(1);
int order = Integer.parseInt(matcher.group(2));
if ("V".equals(prefix)) {
if ("data_harmrate_v".equals(tableName)) {
return resolveRatioPercent(state.vHarmonics[order], state.vHarmonics[1]);
}
return state.vHarmonics[order];
}
if ("I".equals(prefix)) {
if ("data_harmrate_i".equals(tableName)) {
return resolveRatioPercent(state.iHarmonics[order], state.iHarmonics[1]);
}
if ("data_inharm_i".equals(tableName)) {
return state.iInharmonics[order];
}
return state.iHarmonics[order];
}
if ("P".equals(prefix)) {
return state.pHarmonics[order];
}
if ("Q".equals(prefix)) {
return state.qHarmonics[order];
}
if ("S".equals(prefix)) {
return state.sHarmonics[order];
}
}
switch (column) {
case "RMS":
return "data_v".equals(tableName) ? state.phaseVoltage : state.currentRms;
case "RMSAB":
return state.lineVoltageAB;
case "RMSBC":
return state.lineVoltageBC;
case "RMSCA":
return state.lineVoltageCA;
case "VU_DEV":
return state.vUpperDeviation;
case "VL_DEV":
return state.vLowerDeviation;
case "FREQ":
return state.frequency;
case "FREQ_DEV":
return state.frequencyDeviation;
case "V_UNBALANCE":
return state.vUnbalance;
case "V_POS":
return state.vPositive;
case "V_NEG":
return state.vNegative;
case "V_ZERO":
return state.vZero;
case "V_THD":
return state.vThd;
case "I_UNBALANCE":
return state.iUnbalance;
case "I_POS":
return state.iPositive;
case "I_NEG":
return state.iNegative;
case "I_ZERO":
return state.iZero;
case "I_THD":
return state.iThd;
case "PF":
return state.powerFactor;
case "DF":
return state.distortionFactor;
case "P":
return state.activePower;
case "Q":
return state.reactivePower;
case "S":
return state.apparentPower;
case "FLUC":
return state.fluc;
case "PST":
return state.pst;
case "PLT":
return state.plt;
case "FLUCCF":
return state.flucCf;
default:
throw new IllegalStateException("未支持的基础指标列:" + tableName + "." + column);
}
}
/**
* 生成主值派生指标。
*
* @param baseValue 主值
* @param column 派生列名
* @param state 基础状态
* @param metricType 派生类型
* @return 派生值
*/
private Double deriveMetric(Double baseValue, String column, MeasurementState state, MetricType metricType) {
if (baseValue == null) {
throw new IllegalStateException("派生字段缺少主值:" + column);
}
double factor = noise(state.sharedSeed + column.hashCode(), 0.01D, 0.05D);
double delta = Math.max(Math.abs(baseValue) * factor, 0.005D);
double value;
if (MetricType.MAX.equals(metricType)) {
value = baseValue + delta;
} else if (MetricType.MIN.equals(metricType)) {
value = baseValue - delta;
if (baseValue >= 0D) {
value = Math.max(0D, value);
}
} else {
value = baseValue + delta * 0.6D;
}
return round(value, 4);
}
/**
* 构建同源基础状态。
*
* @param lineId 监测点 ID
* @param timeId 时间
* @param phaseCode 相别
* @return 基础状态
*/
private MeasurementState buildState(String lineId, LocalDateTime timeId, String phaseCode) {
long sharedSeed = Objects.hash(lineId, timeId.getYear(), timeId.getDayOfYear(), timeId.getHour(), timeId.getMinute());
long phaseSeed = Objects.hash(sharedSeed, phaseCode);
double minuteOfDay = timeId.getHour() * 60D + timeId.getMinute();
double dayWave = Math.sin(minuteOfDay / 1440D * TWO_PI);
double fastWave = Math.cos(minuteOfDay / 240D * TWO_PI);
double phaseWave = Math.sin(minuteOfDay / 1440D * TWO_PI + resolvePhaseOffset(phaseCode));
double phaseVoltage = round(220D + dayWave * 3.6D + phaseWave * 1.5D + noise(phaseSeed + 11L, -0.9D, 0.9D), 4);
double lineVoltageAB = round(380D + dayWave * 4.8D + noise(sharedSeed + 21L, -1.2D, 1.2D), 4);
double lineVoltageBC = round(381D + fastWave * 3.6D + noise(sharedSeed + 22L, -1.2D, 1.2D), 4);
double lineVoltageCA = round(379D + Math.sin(minuteOfDay / 720D * TWO_PI) * 4.2D + noise(sharedSeed + 23L, -1.2D, 1.2D), 4);
double frequency = round(50D + Math.sin(minuteOfDay / 180D * TWO_PI) * 0.03D + noise(sharedSeed + 31L, -0.01D, 0.01D), 4);
double frequencyDeviation = round(frequency - 50D, 4);
double currentRms = round(85D + fastWave * 12D + phaseWave * 6D + noise(phaseSeed + 41L, -2D, 2D), 4);
double vPositive = round(phaseVoltage * (0.992D + noise(sharedSeed + 51L, -0.003D, 0.003D)), 4);
double vNegative = round(Math.max(0D, phaseVoltage * (0.008D + noise(phaseSeed + 52L, 0D, 0.006D))), 4);
double vZero = round(Math.max(0D, phaseVoltage * (0.006D + noise(phaseSeed + 53L, 0D, 0.004D))), 4);
double vUnbalance = resolveRatioPercent(vNegative, vPositive);
double vUpperDeviation = round(Math.max((phaseVoltage - 220D) / 220D * 100D, 0D), 4);
double vLowerDeviation = round(Math.max((220D - phaseVoltage) / 220D * 100D, 0D), 4);
double iPositive = round(currentRms * (0.986D + noise(sharedSeed + 61L, -0.003D, 0.003D)), 4);
double iNegative = round(Math.max(0D, currentRms * (0.016D + noise(phaseSeed + 62L, 0D, 0.01D))), 4);
double iZero = round(Math.max(0D, currentRms * (0.011D + noise(phaseSeed + 63L, 0D, 0.008D))), 4);
double iUnbalance = resolveRatioPercent(iNegative, iPositive);
double[] vHarmonics = new double[51];
double[] iHarmonics = new double[51];
double[] iInharmonics = new double[51];
vHarmonics[1] = phaseVoltage;
iHarmonics[1] = currentRms;
iInharmonics[1] = round(currentRms * 0.06D, 4);
for (int order = 2; order <= 50; order++) {
double voltageRatio = (0.015D / order) * (1D + noise(sharedSeed + order, -0.35D, 0.35D));
double currentRatio = (0.08D / Math.sqrt(order)) * (1D + noise(phaseSeed + order, -0.4D, 0.4D));
vHarmonics[order] = round(Math.max(0D, phaseVoltage * voltageRatio), 4);
iHarmonics[order] = round(Math.max(0D, currentRms * currentRatio), 4);
iInharmonics[order] = round(Math.max(0D, iHarmonics[order] * noise(phaseSeed + order * 13L, 0.08D, 0.18D)), 4);
}
double vThd = round(resolveThd(vHarmonics), 4);
double iThd = round(resolveThd(iHarmonics), 4);
double[] pHarmonics = new double[51];
double[] qHarmonics = new double[51];
double[] sHarmonics = new double[51];
double activePower = 0D;
double reactivePower = 0D;
double apparentPower = 0D;
for (int order = 1; order <= 50; order++) {
double apparent = Math.max(0D, round(vHarmonics[order] * iHarmonics[order] / 1000D, 4));
double angle = 0.16D + order * 0.008D + noise(sharedSeed + order * 17L, -0.03D, 0.03D);
double active = round(apparent * Math.cos(angle), 4);
double reactive = round(apparent * Math.sin(angle), 4);
pHarmonics[order] = active;
qHarmonics[order] = reactive;
sHarmonics[order] = round(Math.sqrt(active * active + reactive * reactive), 4);
activePower += active;
reactivePower += reactive;
apparentPower += sHarmonics[order];
}
activePower = round(activePower, 4);
reactivePower = round(reactivePower, 4);
apparentPower = round(apparentPower, 4);
double powerFactor = apparentPower == 0D ? 1D : round(clamp(activePower / apparentPower, -1D, 1D), 4);
double distortionFactor = round(1D / Math.sqrt(1D + Math.pow(vThd / 100D, 2D) + Math.pow(iThd / 100D, 2D)), 4);
double fluc = round(0.35D + Math.abs(dayWave) * 0.18D + noise(sharedSeed + 71L, -0.03D, 0.03D), 4);
double pst = round(fluc * 1.12D + 0.08D + noise(sharedSeed + 72L, -0.02D, 0.02D), 4);
double plt = round(Math.max(0.01D, pst * 0.92D + 0.05D + noise(sharedSeed + 73L, -0.02D, 0.02D)), 4);
double flucCf = round(fluc * (0.96D + noise(sharedSeed + 74L, -0.03D, 0.03D)), 4);
return new MeasurementState(sharedSeed, phaseVoltage, lineVoltageAB, lineVoltageBC, lineVoltageCA,
frequency, frequencyDeviation, currentRms, vPositive, vNegative, vZero, vUnbalance,
vUpperDeviation, vLowerDeviation, iPositive, iNegative, iZero, iUnbalance,
vHarmonics, iHarmonics, iInharmonics, vThd, iThd, pHarmonics, qHarmonics, sHarmonics,
activePower, reactivePower, apparentPower, powerFactor, distortionFactor, fluc, pst, plt, flucCf);
}
/**
* 解析百分比。
*
* @param part 分子
* @param total 分母
* @return 百分比
*/
private double resolveRatioPercent(double part, double total) {
if (total == 0D) {
return 0D;
}
return round(part / total * 100D, 4);
}
/**
* 解析 THD。
*
* @param harmonics 谐波数组
* @return thd 百分比
*/
private double resolveThd(double[] harmonics) {
if (harmonics[1] == 0D) {
return 0D;
}
double sum = 0D;
for (int order = 2; order < harmonics.length; order++) {
sum += harmonics[order] * harmonics[order];
}
return Math.sqrt(sum) / harmonics[1] * 100D;
}
/**
* 解析相别偏移角。
*
* @param phaseCode 相别
* @return 偏移角
*/
private double resolvePhaseOffset(String phaseCode) {
if ("B".equals(phaseCode)) {
return TWO_PI / 3D;
}
if ("C".equals(phaseCode)) {
return TWO_PI / 3D * 2D;
}
return 0D;
}
/**
* 生成稳定随机扰动。
*
* @param seed 种子
* @param min 最小值
* @param max 最大值
* @return 扰动值
*/
private double noise(long seed, double min, double max) {
return min + (max - min) * new SplittableRandom(seed).nextDouble();
}
/**
* 截断小数位。
*
* @param value 原始值
* @param scale 保留位数
* @return 处理后的值
*/
private double round(double value, int scale) {
if (Math.abs(value) < 0.0000001D) {
return 0D;
}
return BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_UP).doubleValue();
}
/**
* 移除列后缀。
*
* @param column 列名
* @param suffix 后缀
* @return 主值列名
*/
private String removeSuffix(String column, String suffix) {
return column.substring(0, column.length() - suffix.length());
}
/**
* 判断是否为系统字段。
*
* @param column 列名
* @return true 表示系统字段
*/
private boolean isInfrastructureColumn(String column) {
return "TIMEID".equals(column) || "LINEID".equals(column) || "PHASIC_TYPE".equals(column) || "QUALITYFLAG".equals(column);
}
/**
* 判断是否为派生列。
*
* @param column 列名
* @return true 表示派生列
*/
private boolean isDerivedColumn(String column) {
return column.endsWith(SUFFIX_MAX) || column.endsWith(SUFFIX_MIN) || column.endsWith(SUFFIX_CP95);
}
/**
* 数值类型。
*/
private enum MetricType {
/** 最大值。 */
MAX,
/** 最小值。 */
MIN,
/** cp95。 */
CP95
}
/**
* 数值约束。
*
* @param value 值
* @param min 最小值
* @param max 最大值
* @return 限幅结果
*/
private double clamp(double value, double min, double max) {
return Math.max(min, Math.min(max, value));
}
/**
* 同源基础状态。
*/
private static final class MeasurementState {
/** 共享种子。 */
private final long sharedSeed;
/** 电压有效值。 */
private final double phaseVoltage;
/** AB 线电压。 */
private final double lineVoltageAB;
/** BC 线电压。 */
private final double lineVoltageBC;
/** CA 线电压。 */
private final double lineVoltageCA;
/** 频率。 */
private final double frequency;
/** 频偏。 */
private final double frequencyDeviation;
/** 电流有效值。 */
private final double currentRms;
/** 电压正序。 */
private final double vPositive;
/** 电压负序。 */
private final double vNegative;
/** 电压零序。 */
private final double vZero;
/** 电压不平衡度。 */
private final double vUnbalance;
/** 电压上偏差。 */
private final double vUpperDeviation;
/** 电压下偏差。 */
private final double vLowerDeviation;
/** 电流正序。 */
private final double iPositive;
/** 电流负序。 */
private final double iNegative;
/** 电流零序。 */
private final double iZero;
/** 电流不平衡度。 */
private final double iUnbalance;
/** 电压谐波。 */
private final double[] vHarmonics;
/** 电流谐波。 */
private final double[] iHarmonics;
/** 间谐波电流。 */
private final double[] iInharmonics;
/** 电压 thd。 */
private final double vThd;
/** 电流 thd。 */
private final double iThd;
/** 有功谐波功率。 */
private final double[] pHarmonics;
/** 无功谐波功率。 */
private final double[] qHarmonics;
/** 视在谐波功率。 */
private final double[] sHarmonics;
/** 总有功。 */
private final double activePower;
/** 总无功。 */
private final double reactivePower;
/** 总视在。 */
private final double apparentPower;
/** 功率因数。 */
private final double powerFactor;
/** 畸变因数。 */
private final double distortionFactor;
/** 波动值。 */
private final double fluc;
/** 短时闪变。 */
private final double pst;
/** 长时闪变。 */
private final double plt;
/** 波动修正值。 */
private final double flucCf;
private MeasurementState(long sharedSeed, double phaseVoltage, double lineVoltageAB, double lineVoltageBC, double lineVoltageCA,
double frequency, double frequencyDeviation, double currentRms, double vPositive, double vNegative,
double vZero, double vUnbalance, double vUpperDeviation, double vLowerDeviation, double iPositive,
double iNegative, double iZero, double iUnbalance, double[] vHarmonics, double[] iHarmonics,
double[] iInharmonics, double vThd, double iThd, double[] pHarmonics, double[] qHarmonics,
double[] sHarmonics, double activePower, double reactivePower, double apparentPower,
double powerFactor, double distortionFactor, double fluc, double pst, double plt, double flucCf) {
this.sharedSeed = sharedSeed;
this.phaseVoltage = phaseVoltage;
this.lineVoltageAB = lineVoltageAB;
this.lineVoltageBC = lineVoltageBC;
this.lineVoltageCA = lineVoltageCA;
this.frequency = frequency;
this.frequencyDeviation = frequencyDeviation;
this.currentRms = currentRms;
this.vPositive = vPositive;
this.vNegative = vNegative;
this.vZero = vZero;
this.vUnbalance = vUnbalance;
this.vUpperDeviation = vUpperDeviation;
this.vLowerDeviation = vLowerDeviation;
this.iPositive = iPositive;
this.iNegative = iNegative;
this.iZero = iZero;
this.iUnbalance = iUnbalance;
this.vHarmonics = vHarmonics;
this.iHarmonics = iHarmonics;
this.iInharmonics = iInharmonics;
this.vThd = vThd;
this.iThd = iThd;
this.pHarmonics = pHarmonics;
this.qHarmonics = qHarmonics;
this.sHarmonics = sHarmonics;
this.activePower = activePower;
this.reactivePower = reactivePower;
this.apparentPower = apparentPower;
this.powerFactor = powerFactor;
this.distortionFactor = distortionFactor;
this.fluc = fluc;
this.pst = pst;
this.plt = plt;
this.flucCf = flucCf;
}
}
}

View File

@@ -0,0 +1,37 @@
package com.njcn.gather.tool.adddata.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* add-data 模块任务线程池配置。
*/
@Slf4j
@Configuration
public class AddDataExecutorConfig {
@Bean(name = "addDataTaskExecutorService", destroyMethod = "shutdown")
public ExecutorService addDataTaskExecutorService() {
AtomicInteger threadIndex = new AtomicInteger(1);
return new ThreadPoolExecutor(
1,
1,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(8),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("add-data-task-" + threadIndex.getAndIncrement());
return thread;
},
(runnable, executor) -> log.warn("数据补录任务线程池已满,拒绝新的补数任务")
);
}
}

View File

@@ -0,0 +1,90 @@
package com.njcn.gather.tool.adddata.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.LogUtil;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.service.AddDataTaskService;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 数据补录任务接口。
*/
@Validated
@Slf4j
@Api(tags = "数据补录任务")
@RestController
@RequestMapping("/addData/task")
@RequiredArgsConstructor
public class AddDataTaskController extends BaseController {
/** 数据补录任务服务。 */
private final AddDataTaskService addDataTaskService;
/**
* 预估本次补数规模。
*
* @param param 补数参数
* @return 预估结果
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("预估电能质量批量补数规模")
@PostMapping("/preview")
public HttpResult<AddDataPreviewVO> preview(@RequestBody @Validated AddDataTaskRequestParam param) {
String methodDescribe = getMethodDescribe("preview");
LogUtil.njcnDebug(log, "{}开始预估补数规模lineCount={}, intervalMinutes={}",
methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes());
AddDataPreviewVO result = addDataTaskService.preview(param);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
/**
* 创建后台补数任务。
*
* @param param 补数参数
* @return 任务编号
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("创建电能质量批量补数任务")
@PostMapping("/create")
public HttpResult<AddDataTaskCreateVO> create(@RequestBody @Validated AddDataTaskRequestParam param) {
String methodDescribe = getMethodDescribe("create");
LogUtil.njcnDebug(log, "{}开始创建补数任务lineCount={}, intervalMinutes={}",
methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes());
AddDataTaskCreateVO result = addDataTaskService.create(param);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
/**
* 查询任务状态。
*
* @param taskId 任务编号
* @return 当前任务状态
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("查询电能质量批量补数任务状态")
@GetMapping("/status/{taskId}")
public HttpResult<AddDataTaskStatusVO> status(@PathVariable("taskId") String taskId) {
String methodDescribe = getMethodDescribe("status");
LogUtil.njcnDebug(log, "{}开始查询补数任务状态taskId={}", methodDescribe, taskId);
AddDataTaskStatusVO result = addDataTaskService.getStatus(taskId);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -0,0 +1,49 @@
package com.njcn.gather.tool.adddata.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.LogUtil;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import com.njcn.gather.tool.adddata.service.AddDataTemplateService;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 数据补录模板展示接口。
*/
@Slf4j
@Api(tags = "数据补录模板")
@RestController
@RequestMapping("/addData/template")
@RequiredArgsConstructor
public class AddDataTemplateController extends BaseController {
/** 数据补录模板服务。 */
private final AddDataTemplateService addDataTemplateService;
/**
* 返回前端模板展示规则。
*
* @return 模板列表
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("查询数据补录模板规则")
@GetMapping("/list")
public HttpResult<List<AddDataTemplateVO>> list() {
String methodDescribe = getMethodDescribe("list");
LogUtil.njcnDebug(log, "{},开始查询数据补录模板规则", methodDescribe);
List<AddDataTemplateVO> result = addDataTemplateService.listTemplates();
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -0,0 +1,38 @@
package com.njcn.gather.tool.adddata.pojo.bo;
import lombok.Getter;
/**
* 批量写入结果。
*/
@Getter
public class AddDataBatchWriteResult {
/** 成功插入条数。 */
private final long insertedCount;
/** 因主键重复被跳过的条数。 */
private final long skippedCount;
/** 非主键冲突失败条数。 */
private final long failedCount;
/** 第一条失败原因。 */
private final String firstFailureMessage;
public AddDataBatchWriteResult(long insertedCount, long skippedCount, long failedCount, String firstFailureMessage) {
this.insertedCount = insertedCount;
this.skippedCount = skippedCount;
this.failedCount = failedCount;
this.firstFailureMessage = firstFailureMessage;
}
/**
* 当前批次是否存在失败。
*
* @return true 表示存在失败
*/
public boolean hasFailure() {
return failedCount > 0;
}
}

View File

@@ -0,0 +1,64 @@
package com.njcn.gather.tool.adddata.pojo.bo;
import lombok.Getter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 数据补录表定义。
*/
@Getter
public class AddDataTableDefinition {
/** 分表时间轴类型。 */
public enum TimeAxisType {
/** 使用前端传入步长。 */
REQUEST_INTERVAL,
/** 固定 10 分钟。 */
FIXED_TEN_MINUTES,
/** 固定 2 小时。 */
FIXED_TWO_HOURS
}
/** 表名。 */
private final String tableName;
/** 字段列表,顺序与落库 SQL 保持一致。 */
private final List<String> columns;
/** 首版实际落库相别集合。 */
private final List<String> phaseCodes;
/** 单批写入大小。 */
private final int batchSize;
/** 时间轴类型。 */
private final TimeAxisType timeAxisType;
public AddDataTableDefinition(String tableName, List<String> columns, List<String> phaseCodes,
int batchSize, TimeAxisType timeAxisType) {
this.tableName = tableName;
this.columns = Collections.unmodifiableList(new ArrayList<String>(columns));
this.phaseCodes = Collections.unmodifiableList(new ArrayList<String>(phaseCodes));
this.batchSize = batchSize;
this.timeAxisType = timeAxisType;
}
/**
* 解析当前表真实使用的时间步长。
*
* @param requestIntervalMinutes 前端传入步长
* @return 当前表步长
*/
public int resolveIntervalMinutes(int requestIntervalMinutes) {
if (TimeAxisType.FIXED_TEN_MINUTES.equals(timeAxisType)) {
return 10;
}
if (TimeAxisType.FIXED_TWO_HOURS.equals(timeAxisType)) {
return 120;
}
return requestIntervalMinutes;
}
}

View File

@@ -0,0 +1,34 @@
package com.njcn.gather.tool.adddata.pojo.bo;
import lombok.Getter;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 归一化后的补数任务命令。
*/
@Getter
public class AddDataTaskCommand {
/** 监测点 ID 列表。 */
private final List<String> lineIds;
/** 开始时间。 */
private final LocalDateTime startTime;
/** 结束时间。 */
private final LocalDateTime endTime;
/** 用户步长。 */
private final int intervalMinutes;
public AddDataTaskCommand(List<String> lineIds, LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) {
this.lineIds = Collections.unmodifiableList(new ArrayList<String>(lineIds));
this.startTime = startTime;
this.endTime = endTime;
this.intervalMinutes = intervalMinutes;
}
}

View File

@@ -0,0 +1,15 @@
package com.njcn.gather.tool.adddata.pojo.enums;
/**
* 数据补录任务状态。
*/
public enum AddDataTaskStatusEnum {
/** 等待执行。 */
WAITING,
/** 执行中。 */
RUNNING,
/** 执行成功。 */
SUCCESS,
/** 执行失败。 */
FAILED
}

View File

@@ -0,0 +1,35 @@
package com.njcn.gather.tool.adddata.pojo.param;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.List;
/**
* 数据补录任务请求参数。
*/
@Data
@ApiModel("数据补录任务请求参数")
public class AddDataTaskRequestParam {
@ApiModelProperty(value = "监测点 ID 列表", required = true)
@NotEmpty(message = "监测点 ID 列表不能为空")
private List<String> lineIds = new ArrayList<String>();
@ApiModelProperty(value = "开始时间,格式 yyyy-MM-dd HH:mm:ss", required = true)
@NotBlank(message = "开始时间不能为空")
private String startTime;
@ApiModelProperty(value = "结束时间,格式 yyyy-MM-dd HH:mm:ss", required = true)
@NotBlank(message = "结束时间不能为空")
private String endTime;
@ApiModelProperty(value = "用户步长,仅支持 1/3/5/10 分钟", required = true)
@NotNull(message = "时间步长不能为空")
private Integer intervalMinutes;
}

View File

@@ -0,0 +1,22 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
/**
* 单表预估结果。
*/
@Data
public class AddDataPreviewTableVO {
/** 表名。 */
private String tableName;
/** 当前表命中的时间点数量。 */
private long timePointCount;
/** 当前表实际展开的落库相别数量。 */
private int phaseCount;
/** 当前表预估写入条数。 */
private long rowCount;
}

View File

@@ -0,0 +1,25 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 补数规模预估结果。
*/
@Data
public class AddDataPreviewVO {
/** 监测点数量。 */
private int lineCount;
/** 用户步长。 */
private int intervalMinutes;
/** 总预估条数。 */
private long totalRowCount;
/** 单表预估详情。 */
private List<AddDataPreviewTableVO> tableStats = new ArrayList<AddDataPreviewTableVO>();
}

View File

@@ -0,0 +1,16 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
/**
* 创建任务返回值。
*/
@Data
public class AddDataTaskCreateVO {
/** 任务编号。 */
private String taskId;
/** 初始状态。 */
private String status;
}

View File

@@ -0,0 +1,46 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 数据补录任务状态。
*/
@Data
public class AddDataTaskStatusVO {
/** 任务编号。 */
private String taskId;
/** 当前状态。 */
private String status;
/** 当前执行表名。 */
private String currentTableName;
/** 当前批次描述。 */
private String currentBatchInfo;
/** 成功写入数量。 */
private long insertedCount;
/** 主键重复跳过数量。 */
private long skippedCount;
/** 非主键冲突失败数量。 */
private long failedCount;
/** 失败原因。 */
private String failureReason;
/** 每小时返回给前端展示的业务时刻。 */
private List<String> hourlyTimeResults = new ArrayList<String>();
/** 任务开始时间。 */
private String startTime;
/** 任务结束时间。 */
private String endTime;
}

View File

@@ -0,0 +1,46 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 前端展示模板规则。
*/
@Data
public class AddDataTemplateVO {
/** 电能质量参数名称。 */
private String parameterName;
/** 关联表名。 */
private String tableName;
/** 前端展示相别。 */
private String phaseDisplay;
/** 实际落库相别集合。 */
private List<String> phaseCodes = new ArrayList<String>();
/** 当前模板是否展示。 */
private boolean display;
/** 是否展示是否合格列。 */
private boolean showQualified;
/** 最大值规则。 */
private String maxValueRule;
/** 最小值规则。 */
private String minValueRule;
/** 平均值规则。 */
private String averageValueRule;
/** 95% 概率大值规则。 */
private String cp95ValueRule;
/** 默认保留小数位数。 */
private int decimalScale;
}

View File

@@ -0,0 +1,36 @@
package com.njcn.gather.tool.adddata.service;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
/**
* 数据补录任务服务。
*/
public interface AddDataTaskService {
/**
* 预估补数规模。
*
* @param param 补数参数
* @return 预估结果
*/
AddDataPreviewVO preview(AddDataTaskRequestParam param);
/**
* 创建补数任务。
*
* @param param 补数参数
* @return 任务创建结果
*/
AddDataTaskCreateVO create(AddDataTaskRequestParam param);
/**
* 查询补数任务状态。
*
* @param taskId 任务编号
* @return 当前任务状态
*/
AddDataTaskStatusVO getStatus(String taskId);
}

View File

@@ -0,0 +1,18 @@
package com.njcn.gather.tool.adddata.service;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import java.util.List;
/**
* 数据补录模板服务。
*/
public interface AddDataTemplateService {
/**
* 返回前端参数模板。
*
* @return 模板列表
*/
List<AddDataTemplateVO> listTemplates();
}

View File

@@ -0,0 +1,142 @@
package com.njcn.gather.tool.adddata.service.impl;
import com.njcn.gather.tool.adddata.component.AddDataTableRegistry;
import com.njcn.gather.tool.adddata.component.AddDataTaskExecutor;
import com.njcn.gather.tool.adddata.component.AddDataTaskStatusHolder;
import com.njcn.gather.tool.adddata.component.AddDataTimeSlotCalculator;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewTableVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.service.AddDataTaskService;
import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* 数据补录任务服务实现。
*/
@Service
@RequiredArgsConstructor
public class AddDataTaskServiceImpl implements AddDataTaskService {
/** 支持的用户步长。 */
private static final Set<Integer> SUPPORTED_INTERVALS = new LinkedHashSet<Integer>(Arrays.asList(1, 3, 5, 10));
/** 表定义注册器。 */
private final AddDataTableRegistry addDataTableRegistry;
/** 时间槽计算器。 */
private final AddDataTimeSlotCalculator addDataTimeSlotCalculator;
/** 任务状态持有器。 */
private final AddDataTaskStatusHolder addDataTaskStatusHolder;
/** 后台执行器。 */
private final AddDataTaskExecutor addDataTaskExecutor;
@Override
public AddDataPreviewVO preview(AddDataTaskRequestParam param) {
AddDataTaskCommand command = buildCommand(param);
AddDataPreviewVO result = new AddDataPreviewVO();
result.setLineCount(command.getLineIds().size());
result.setIntervalMinutes(command.getIntervalMinutes());
long totalRowCount = 0L;
List<AddDataPreviewTableVO> tableStats = new ArrayList<AddDataPreviewTableVO>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
long timePointCount = addDataTimeSlotCalculator.countTimeSlots(
command.getStartTime(), command.getEndTime(), definition.resolveIntervalMinutes(command.getIntervalMinutes()));
long rowCount = timePointCount * command.getLineIds().size() * definition.getPhaseCodes().size();
AddDataPreviewTableVO tableVO = new AddDataPreviewTableVO();
tableVO.setTableName(definition.getTableName());
tableVO.setTimePointCount(timePointCount);
tableVO.setPhaseCount(definition.getPhaseCodes().size());
tableVO.setRowCount(rowCount);
tableStats.add(tableVO);
totalRowCount += rowCount;
}
result.setTableStats(tableStats);
result.setTotalRowCount(totalRowCount);
return result;
}
@Override
public AddDataTaskCreateVO create(AddDataTaskRequestParam param) {
AddDataTaskCommand command = buildCommand(param);
AddDataTaskStatusVO snapshot = addDataTaskStatusHolder.createWaitingTask(command);
addDataTaskExecutor.submit(snapshot.getTaskId(), command);
AddDataTaskCreateVO result = new AddDataTaskCreateVO();
result.setTaskId(snapshot.getTaskId());
result.setStatus(snapshot.getStatus());
return result;
}
@Override
public AddDataTaskStatusVO getStatus(String taskId) {
if (taskId == null || taskId.trim().isEmpty()) {
throw new IllegalArgumentException("任务编号不能为空");
}
return addDataTaskStatusHolder.getStatus(taskId.trim());
}
/**
* 归一化任务命令。
*
* @param param 请求参数
* @return 任务命令
*/
private AddDataTaskCommand buildCommand(AddDataTaskRequestParam param) {
if (param == null) {
throw new IllegalArgumentException("补数参数不能为空");
}
Integer intervalMinutes = param.getIntervalMinutes();
if (intervalMinutes == null || !SUPPORTED_INTERVALS.contains(intervalMinutes)) {
throw new IllegalArgumentException("时间步长仅支持 1、3、5、10 分钟");
}
List<String> lineIds = normalizeLineIds(param.getLineIds());
LocalDateTime startTime = AddDataDateTimeUtil.parse(param.getStartTime());
LocalDateTime endTime = AddDataDateTimeUtil.parse(param.getEndTime());
if (startTime.isAfter(endTime)) {
throw new IllegalArgumentException("开始时间不能大于结束时间");
}
return new AddDataTaskCommand(lineIds, startTime, endTime, intervalMinutes);
}
/**
* 标准化监测点列表。
*
* @param lineIds 原始监测点列表
* @return 去重后的监测点列表
*/
private List<String> normalizeLineIds(List<String> lineIds) {
if (lineIds == null || lineIds.isEmpty()) {
throw new IllegalArgumentException("监测点 ID 列表不能为空");
}
LinkedHashSet<String> normalized = new LinkedHashSet<String>();
for (String lineId : lineIds) {
if (lineId == null) {
throw new IllegalArgumentException("监测点 ID 不能为空");
}
String normalizedLineId = lineId.trim();
// LINEID 已改为 char(32),这里同步限制字符串长度,避免运行时写入越界。
if (normalizedLineId.isEmpty()) {
throw new IllegalArgumentException("监测点 ID 不能为空");
}
if (normalizedLineId.length() > 32) {
throw new IllegalArgumentException("监测点 ID 长度不能超过 32 位");
}
normalized.add(normalizedLineId);
}
return new ArrayList<String>(normalized);
}
}

View File

@@ -0,0 +1,25 @@
package com.njcn.gather.tool.adddata.service.impl;
import com.njcn.gather.tool.adddata.component.AddDataTemplateRegistry;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import com.njcn.gather.tool.adddata.service.AddDataTemplateService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 数据补录模板服务实现。
*/
@Service
@RequiredArgsConstructor
public class AddDataTemplateServiceImpl implements AddDataTemplateService {
/** 模板注册器。 */
private final AddDataTemplateRegistry addDataTemplateRegistry;
@Override
public List<AddDataTemplateVO> listTemplates() {
return addDataTemplateRegistry.getTemplates();
}
}

View File

@@ -0,0 +1,51 @@
package com.njcn.gather.tool.adddata.util;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
/**
* add-data 模块时间工具。
*/
public final class AddDataDateTimeUtil {
/** 统一返回格式。 */
private static final DateTimeFormatter OUTPUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/** 允许的入参时间格式。 */
private static final DateTimeFormatter[] INPUT_FORMATTERS = new DateTimeFormatter[]{
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm")
};
private AddDataDateTimeUtil() {
}
/**
* 解析请求时间。
*
* @param timeText 时间文本
* @return 解析后的时间
*/
public static LocalDateTime parse(String timeText) {
for (DateTimeFormatter formatter : INPUT_FORMATTERS) {
try {
return LocalDateTime.parse(timeText, formatter);
} catch (DateTimeParseException ignored) {
}
}
throw new IllegalArgumentException("时间格式不正确,仅支持 yyyy-MM-dd HH:mm:ss 或 yyyy-MM-dd'T'HH:mm:ss");
}
/**
* 格式化时间。
*
* @param time 时间对象
* @return 格式化结果
*/
public static String format(LocalDateTime time) {
return time == null ? null : time.format(OUTPUT_FORMATTER);
}
}

View File

@@ -0,0 +1,28 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
/**
* 表定义注册测试。
*/
class AddDataTableRegistryTest {
@Test
void shouldLoadAllThirteenTablesFromSchema() throws Exception {
AddDataTableRegistry registry = new AddDataTableRegistry();
registry.afterPropertiesSet();
List<AddDataTableDefinition> definitions = registry.getTableDefinitions();
Assertions.assertEquals(13, definitions.size());
Assertions.assertEquals("data_flicker", definitions.get(0).getTableName());
Assertions.assertEquals("data_v", definitions.get(definitions.size() - 1).getTableName());
Assertions.assertTrue(registry.getDefinition("data_v").getColumns().contains("V_THD"));
Assertions.assertEquals(4, registry.getDefinition("data_v").getPhaseCodes().size());
Assertions.assertTrue(registry.getDefinition("data_v").getPhaseCodes().contains("T"));
}
}

View File

@@ -0,0 +1,33 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.Arrays;
/**
* 补数任务状态持有器测试。
*/
class AddDataTaskStatusHolderTest {
private final AddDataTaskStatusHolder holder = new AddDataTaskStatusHolder(new AddDataTimeSlotCalculator());
@Test
void shouldReturnHourlyTimeResultsWhenCreateTask() {
AddDataTaskCommand command = new AddDataTaskCommand(
Arrays.asList("1"),
LocalDateTime.of(2026, 4, 28, 10, 7, 0),
LocalDateTime.of(2026, 4, 28, 13, 0, 0),
5);
AddDataTaskStatusVO status = holder.createWaitingTask(command);
Assertions.assertEquals(Arrays.asList(
"2026-04-28 11:00:00",
"2026-04-28 12:00:00",
"2026-04-28 13:00:00"), status.getHourlyTimeResults());
}
}

View File

@@ -0,0 +1,33 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* 模板注册测试。
*/
class AddDataTemplateRegistryTest {
@Test
void shouldOnlyExposeABCTPhaseCodes() {
AddDataTemplateRegistry registry = new AddDataTemplateRegistry();
List<AddDataTemplateVO> templates = registry.getTemplates();
Set<String> allowedPhaseCodes = new HashSet<String>();
allowedPhaseCodes.add("A");
allowedPhaseCodes.add("B");
allowedPhaseCodes.add("C");
allowedPhaseCodes.add("T");
for (AddDataTemplateVO template : templates) {
for (String phaseCode : template.getPhaseCodes()) {
Assertions.assertTrue(allowedPhaseCodes.contains(phaseCode));
}
}
}
}

View File

@@ -0,0 +1,61 @@
package com.njcn.gather.tool.adddata.component;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.List;
/**
* 时间槽计算测试。
*/
class AddDataTimeSlotCalculatorTest {
private final AddDataTimeSlotCalculator calculator = new AddDataTimeSlotCalculator();
@Test
void shouldAlignToNextNaturalSlot() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 12);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 10, 22, 0);
List<LocalDateTime> slots = calculator.buildTimeSlots(start, end, 5);
Assertions.assertEquals(3, slots.size());
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 10, 0), slots.get(0));
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 20, 0), slots.get(2));
}
@Test
void shouldReturnEmptyWhenRangeDoesNotContainAnySlot() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 0);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 10, 9, 59);
List<LocalDateTime> slots = calculator.buildTimeSlots(start, end, 10);
Assertions.assertTrue(slots.isEmpty());
}
@Test
void shouldBuildHourlySlotsFromNextNaturalHour() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 0);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 13, 0, 0);
List<LocalDateTime> slots = calculator.buildHourlyTimeSlots(start, end);
Assertions.assertEquals(3, slots.size());
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 11, 0, 0), slots.get(0));
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 13, 0, 0), slots.get(2));
}
@Test
void shouldIncludeStartWhenAlreadyAtNaturalHour() {
LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 0, 0);
LocalDateTime end = LocalDateTime.of(2026, 4, 28, 12, 30, 0);
List<LocalDateTime> slots = calculator.buildHourlyTimeSlots(start, end);
Assertions.assertEquals(3, slots.size());
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 0, 0), slots.get(0));
Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 12, 0, 0), slots.get(2));
}
}

View File

@@ -0,0 +1,32 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.List;
/**
* 数值生成器测试。
*/
class AddDataValueGeneratorTest {
@Test
void shouldGenerateStableDataVRowWithExpectedColumnCount() throws Exception {
AddDataTableRegistry registry = new AddDataTableRegistry();
registry.afterPropertiesSet();
AddDataTableDefinition definition = registry.getDefinition("data_v");
AddDataValueGenerator generator = new AddDataValueGenerator();
List<Object> row = generator.generateRow(definition, "f04a9d62e3d24e6580e4f32b40967505", LocalDateTime.of(2026, 4, 28, 10, 10, 0), "A");
Assertions.assertEquals(definition.getColumns().size(), row.size());
Assertions.assertEquals("A", row.get(definition.getColumns().indexOf("PHASIC_TYPE")));
Double rms = (Double) row.get(definition.getColumns().indexOf("RMS"));
Double rmsMax = (Double) row.get(definition.getColumns().indexOf("RMS_MAX"));
Double rmsMin = (Double) row.get(definition.getColumns().indexOf("RMS_MIN"));
Assertions.assertTrue(rmsMax >= rms);
Assertions.assertTrue(rmsMin <= rms);
}
}