diff --git a/AGENTS.md b/AGENTS.md index 1488a3c..709a9c5 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -33,6 +33,7 @@ Java 源码位于 `src/main/java`,配置文件位于 `src/main/resources`,My ## 代码风格与命名规范 保持现有 Java 风格:4 空格缩进、UTF-8 文件编码、基础包名使用 `com.njcn.gather`。命名沿用分层后缀,如 `*Controller`、`*Service`、`*ServiceImpl`、`*Mapper`、`*Param`、`*PO`、`*VO`。优先复用现有 Lombok 注解,如 `@Data`、`@RequiredArgsConstructor`、`@Slf4j`。Mapper XML 文件名应与接口名保持一致。业务代码中,关键流程、分支判断、状态流转或容易误解的节点需要补充简洁的中文注释,但不要添加无信息量的注释。 +- 参照 `user` 模块的组织方式,Controller 与 Service 都按职责拆分;不同职责的方法放到不同 `*Controller`、`*Service`、`*ServiceImpl` 中,同一模块后续新增方法也要沿既有职责边界归类,不再回退为单一大类承载全部接口。 ## 数据与 SQL 约束 - 新增业务表的 DO 优先复用当前 `BaseDO` / 审计字段风格;除非表本身明确不需要逻辑删除,不要再引入另一套审计基类。 diff --git a/entrance/pom.xml b/entrance/pom.xml index 24d6a1d..2a3e7b2 100644 --- a/entrance/pom.xml +++ b/entrance/pom.xml @@ -48,6 +48,11 @@ mms-mapping 1.0.0 + + com.njcn.gather + add-data + 1.0.0 + diff --git a/tools/add-data/pom.xml b/tools/add-data/pom.xml new file mode 100644 index 0000000..73e794f --- /dev/null +++ b/tools/add-data/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + com.njcn.gather + tools + 1.0.0 + + + add-data + + + + + + + com.njcn + njcn-common + 0.0.1 + + + + com.njcn + spingboot2.3.12 + 2.3.12 + + + + org.springframework + spring-jdbc + + + + org.springframework.boot + spring-boot-starter-test + test + + + diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataBatchWriter.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataBatchWriter.java new file mode 100644 index 0000000..75a0118 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataBatchWriter.java @@ -0,0 +1,135 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * 批量写入组件。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AddDataBatchWriter { + + /** JDBC 模板。 */ + private final JdbcTemplate jdbcTemplate; + + /** + * 写入一个批次。 + * + * @param definition 表定义 + * @param rows 行数据 + * @return 写入结果 + */ + public AddDataBatchWriteResult writeBatch(AddDataTableDefinition definition, List> rows) { + if (rows == null || rows.isEmpty()) { + return new AddDataBatchWriteResult(0L, 0L, 0L, null); + } + try { + return executeInsertIgnore(definition, rows); + } catch (DataAccessException ex) { + log.warn("批量写入失败,开始降级为逐行写入,table={}, batchSize={}, message={}", + definition.getTableName(), rows.size(), resolveErrorMessage(ex)); + return fallbackWriteOneByOne(definition, rows, ex); + } + } + + /** + * 执行批量 INSERT IGNORE。 + * + * @param definition 表定义 + * @param rows 行数据 + * @return 写入结果 + */ + private AddDataBatchWriteResult executeInsertIgnore(AddDataTableDefinition definition, List> rows) { + String sql = buildInsertIgnoreSql(definition, rows.size()); + List args = flattenRows(rows); + int affectedRows = jdbcTemplate.update(sql, args.toArray(new Object[0])); + long insertedCount = Math.max(affectedRows, 0); + long skippedCount = rows.size() - insertedCount; + return new AddDataBatchWriteResult(insertedCount, skippedCount, 0L, null); + } + + /** + * 批量写入失败后逐行降级。 + * + * @param definition 表定义 + * @param rows 行数据 + * @param batchException 原始批量异常 + * @return 写入结果 + */ + private AddDataBatchWriteResult fallbackWriteOneByOne(AddDataTableDefinition definition, List> rows, DataAccessException batchException) { + long insertedCount = 0L; + long skippedCount = 0L; + long failedCount = 0L; + String firstFailureMessage = null; + String sql = buildInsertIgnoreSql(definition, 1); + for (List row : rows) { + try { + int affectedRows = jdbcTemplate.update(sql, row.toArray(new Object[0])); + if (affectedRows > 0) { + insertedCount += affectedRows; + } else { + skippedCount++; + } + } catch (DataAccessException ex) { + failedCount++; + if (firstFailureMessage == null) { + firstFailureMessage = resolveErrorMessage(ex); + } + } + } + if (firstFailureMessage == null) { + firstFailureMessage = resolveErrorMessage(batchException); + } + return new AddDataBatchWriteResult(insertedCount, skippedCount, failedCount, firstFailureMessage); + } + + /** + * 构建 INSERT IGNORE SQL。 + * + * @param definition 表定义 + * @param rowCount 行数 + * @return SQL 文本 + */ + private String buildInsertIgnoreSql(AddDataTableDefinition definition, int rowCount) { + String columnSegment = "`" + String.join("`,`", definition.getColumns()) + "`"; + String placeholderSegment = "(" + String.join(",", Collections.nCopies(definition.getColumns().size(), "?")) + ")"; + String valuesSegment = String.join(",", Collections.nCopies(rowCount, placeholderSegment)); + return "INSERT IGNORE INTO `" + definition.getTableName() + "` (" + columnSegment + ") VALUES " + valuesSegment; + } + + /** + * 展平批量参数。 + * + * @param rows 行数据 + * @return 扁平参数列表 + */ + private List flattenRows(List> rows) { + List args = new ArrayList(); + for (List row : rows) { + args.addAll(row); + } + return args; + } + + /** + * 提取错误信息。 + * + * @param ex 异常 + * @return 错误信息 + */ + private String resolveErrorMessage(DataAccessException ex) { + Throwable root = ex.getMostSpecificCause(); + return root == null ? ex.getMessage() : root.getMessage(); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTableRegistry.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTableRegistry.java new file mode 100644 index 0000000..11681ae --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTableRegistry.java @@ -0,0 +1,152 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.io.ClassPathResource; +import org.springframework.stereotype.Component; +import org.springframework.util.StreamUtils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * 数据补录表定义注册器。 + */ +@Slf4j +@Component +public class AddDataTableRegistry implements InitializingBean { + + /** 元数据 SQL 资源。 */ + private static final String SCHEMA_RESOURCE_PATH = "sql/add-data/DATA_FLICKER.sql"; + + /** 建表语句解析表达式。 */ + private static final Pattern TABLE_PATTERN = Pattern.compile("CREATE TABLE\\s+`([^`]+)`\\s*\\((.*?)\\)\\s*ENGINE", + Pattern.CASE_INSENSITIVE | Pattern.DOTALL); + + /** 列定义解析表达式。 */ + private static final Pattern COLUMN_PATTERN = Pattern.compile("^\\s*`([^`]+)`", Pattern.MULTILINE); + + /** 首版统一按 A/B/C/T 四类相别落库。 */ + private static final List WRITE_PHASE_CODES = Collections.unmodifiableList(Arrays.asList("A", "B", "C", "T")); + + /** 宽表批次大小。 */ + private static final int WIDE_TABLE_BATCH_SIZE = 100; + + /** 窄表批次大小。 */ + private static final int NARROW_TABLE_BATCH_SIZE = 500; + + /** 表定义列表。 */ + private List tableDefinitions = Collections.emptyList(); + + @Override + public void afterPropertiesSet() throws Exception { + Map> columnMap = parseSchemaColumns(); + List definitions = new ArrayList(); + definitions.add(buildDefinition(columnMap, "data_flicker", AddDataTableDefinition.TimeAxisType.FIXED_TEN_MINUTES, NARROW_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_fluc", AddDataTableDefinition.TimeAxisType.FIXED_TEN_MINUTES, NARROW_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_harmphasic_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_harmphasic_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_harmpower_p", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_harmpower_q", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_harmpower_s", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_harmrate_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_harmrate_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_inharm_i", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_plt", AddDataTableDefinition.TimeAxisType.FIXED_TWO_HOURS, NARROW_TABLE_BATCH_SIZE)); + definitions.add(buildDefinition(columnMap, "data_v", AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL, WIDE_TABLE_BATCH_SIZE)); + tableDefinitions = Collections.unmodifiableList(definitions); + log.info("加载 add-data 表定义完成,tableCount={}", tableDefinitions.size()); + } + + /** + * 返回全部表定义。 + * + * @return 表定义列表 + */ + public List getTableDefinitions() { + return tableDefinitions; + } + + /** + * 根据表名查找表定义。 + * + * @param tableName 表名 + * @return 表定义 + */ + public AddDataTableDefinition getDefinition(String tableName) { + for (AddDataTableDefinition definition : tableDefinitions) { + if (definition.getTableName().equals(tableName)) { + return definition; + } + } + throw new IllegalArgumentException("未找到 add-data 表定义:" + tableName); + } + + /** + * 解析 schema SQL 中的列定义。 + * + * @return 表名与字段列表映射 + */ + private Map> parseSchemaColumns() { + String schemaText = loadSchemaText(); + Matcher tableMatcher = TABLE_PATTERN.matcher(schemaText); + Map> columnMap = new LinkedHashMap>(); + while (tableMatcher.find()) { + String tableName = tableMatcher.group(1); + String tableBody = tableMatcher.group(2); + Matcher columnMatcher = COLUMN_PATTERN.matcher(tableBody); + List columns = new ArrayList(); + while (columnMatcher.find()) { + columns.add(columnMatcher.group(1)); + } + columnMap.put(tableName, columns); + } + return columnMap; + } + + /** + * 创建单表定义。 + * + * @param columnMap 字段映射 + * @param tableName 表名 + * @param timeAxisType 时间轴类型 + * @param batchSize 批次大小 + * @return 表定义 + */ + private AddDataTableDefinition buildDefinition(Map> columnMap, String tableName, + AddDataTableDefinition.TimeAxisType timeAxisType, int batchSize) { + List columns = columnMap.get(tableName); + if (columns == null || columns.isEmpty()) { + throw new IllegalStateException("未从 SQL 元数据中解析到表字段:" + tableName); + } + /* + * 当前按用户最新确认,13 张表统一支持 A/B/C/T 四类数据类型。 + * 如果后续补齐逐表更细的相别映射,只需要在这里调整落库相别集合即可。 + */ + return new AddDataTableDefinition(tableName, columns, WRITE_PHASE_CODES, batchSize, timeAxisType); + } + + /** + * 加载 schema SQL 文本。 + * + * @return SQL 文本 + */ + private String loadSchemaText() { + ClassPathResource resource = new ClassPathResource(SCHEMA_RESOURCE_PATH); + try { + return StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new IllegalStateException("读取 add-data SQL 元数据失败:" + SCHEMA_RESOURCE_PATH, e); + } + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTaskExecutor.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTaskExecutor.java new file mode 100644 index 0000000..2847cfb --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTaskExecutor.java @@ -0,0 +1,310 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; + +/** + * 异步任务执行器。 + */ +@Slf4j +@Component +public class AddDataTaskExecutor { + + /** 多个时间点组成一个处理窗口,兼顾批量效率和内存占用。 */ + private static final int TIME_WINDOW_SIZE = 30; + + /** 任务线程池。 */ + @Qualifier("addDataTaskExecutorService") + private final ExecutorService addDataTaskExecutorService; + + /** 表定义注册器。 */ + private final AddDataTableRegistry addDataTableRegistry; + + /** 时间槽计算器。 */ + private final AddDataTimeSlotCalculator addDataTimeSlotCalculator; + + /** 数据生成器。 */ + private final AddDataValueGenerator addDataValueGenerator; + + /** 批量写入组件。 */ + private final AddDataBatchWriter addDataBatchWriter; + + /** 状态持有器。 */ + private final AddDataTaskStatusHolder addDataTaskStatusHolder; + + public AddDataTaskExecutor(@Qualifier("addDataTaskExecutorService") ExecutorService addDataTaskExecutorService, + AddDataTableRegistry addDataTableRegistry, + AddDataTimeSlotCalculator addDataTimeSlotCalculator, + AddDataValueGenerator addDataValueGenerator, + AddDataBatchWriter addDataBatchWriter, + AddDataTaskStatusHolder addDataTaskStatusHolder) { + this.addDataTaskExecutorService = addDataTaskExecutorService; + this.addDataTableRegistry = addDataTableRegistry; + this.addDataTimeSlotCalculator = addDataTimeSlotCalculator; + this.addDataValueGenerator = addDataValueGenerator; + this.addDataBatchWriter = addDataBatchWriter; + this.addDataTaskStatusHolder = addDataTaskStatusHolder; + } + + /** + * 提交后台任务。 + * + * @param taskId 任务编号 + * @param command 任务命令 + */ + public void submit(String taskId, AddDataTaskCommand command) { + addDataTaskExecutorService.submit(() -> execute(taskId, command)); + } + + /** + * 执行补数任务。 + * + * @param taskId 任务编号 + * @param command 任务命令 + */ + private void execute(String taskId, AddDataTaskCommand command) { + try { + addDataTaskStatusHolder.markRunning(taskId); + Map> timeSlotsByTable = buildTimeSlotsByTable(command); + Map> timeSlotLookupByTable = buildTimeSlotLookupByTable(timeSlotsByTable); + List mergedTimeSlots = mergeTimeSlots(timeSlotsByTable); + Map batchNoMap = new HashMap(); + Map>> pendingRowsByTable = buildPendingRowsByTable(); + for (int startIndex = 0; startIndex < mergedTimeSlots.size(); startIndex += TIME_WINDOW_SIZE) { + int endIndex = Math.min(startIndex + TIME_WINDOW_SIZE, mergedTimeSlots.size()); + List timeWindow = mergedTimeSlots.subList(startIndex, endIndex); + GeneratedBatchData generatedBatchData = generateBatchData(command, timeWindow, timeSlotLookupByTable); + if (!writeBatchData(taskId, generatedBatchData, pendingRowsByTable, batchNoMap)) { + return; + } + } + if (!flushRemainingBatchData(taskId, pendingRowsByTable, batchNoMap)) { + return; + } + addDataTaskStatusHolder.markSuccess(taskId); + } catch (Exception ex) { + log.error("执行补数任务失败,taskId={}", taskId, ex); + addDataTaskStatusHolder.markFailed(taskId, ex.getMessage()); + } + } + + /** + * 为每张表构建各自命中的时间槽。 + * + * @param command 任务命令 + * @return 按表名分组的时间槽 + */ + private Map> buildTimeSlotsByTable(AddDataTaskCommand command) { + Map> result = new LinkedHashMap>(); + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + int intervalMinutes = definition.resolveIntervalMinutes(command.getIntervalMinutes()); + List timeSlots = addDataTimeSlotCalculator.buildTimeSlots( + command.getStartTime(), command.getEndTime(), intervalMinutes); + result.put(definition.getTableName(), timeSlots); + } + return result; + } + + /** + * 为每张表补充时间槽查找索引,避免窗口生成时重复线性扫描。 + * + * @param timeSlotsByTable 按表名分组的时间槽 + * @return 按表名分组的时间槽查找集合 + */ + private Map> buildTimeSlotLookupByTable(Map> timeSlotsByTable) { + Map> result = new LinkedHashMap>(); + for (Map.Entry> entry : timeSlotsByTable.entrySet()) { + result.put(entry.getKey(), new HashSet(entry.getValue())); + } + return result; + } + + /** + * 为每张表初始化跨窗口复用的待写入缓存。 + * + * @return 按表名分组的待写入缓存 + */ + private Map>> buildPendingRowsByTable() { + Map>> result = new LinkedHashMap>>(); + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + result.put(definition.getTableName(), new ArrayList>()); + } + return result; + } + + /** + * 合并所有表的时间槽,得到统一的自然时间轴。 + * + * @param timeSlotsByTable 按表名分组的时间槽 + * @return 去重后的统一时间轴 + */ + private List mergeTimeSlots(Map> timeSlotsByTable) { + TreeSet merged = new TreeSet(); + for (List timeSlots : timeSlotsByTable.values()) { + merged.addAll(timeSlots); + } + return new ArrayList(merged); + } + + /** + * 按多个时间点窗口生成待写入数据,不直接写库。 + * + * @param command 任务命令 + * @param timeWindow 当前时间窗口 + * @param timeSlotLookupByTable 各表命中的时间槽索引 + * @return 当前窗口生成结果 + */ + private GeneratedBatchData generateBatchData(AddDataTaskCommand command, List timeWindow, + Map> timeSlotLookupByTable) { + Map>> rowsByTable = new LinkedHashMap>>(); + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + rowsByTable.put(definition.getTableName(), new ArrayList>()); + } + for (LocalDateTime timeSlot : timeWindow) { + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + Set tableTimeSlots = timeSlotLookupByTable.get(definition.getTableName()); + if (!tableTimeSlots.contains(timeSlot)) { + continue; + } + List> rows = rowsByTable.get(definition.getTableName()); + for (String lineId : command.getLineIds()) { + for (String phaseCode : definition.getPhaseCodes()) { + rows.add(addDataValueGenerator.generateRow(definition, lineId, timeSlot, phaseCode)); + } + } + } + } + return new GeneratedBatchData(rowsByTable); + } + + /** + * 写入当前窗口已经生成的数据。 + * + * @param taskId 任务编号 + * @param generatedBatchData 当前窗口生成结果 + * @param pendingRowsByTable 跨窗口复用的待写入缓存 + * @param batchNoMap 每张表的批次号缓存 + * @return true 表示继续执行 + */ + private boolean writeBatchData(String taskId, GeneratedBatchData generatedBatchData, + Map>> pendingRowsByTable, Map batchNoMap) { + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + List> rows = generatedBatchData.getRows(definition.getTableName()); + if (rows.isEmpty()) { + continue; + } + List> pendingRows = pendingRowsByTable.get(definition.getTableName()); + pendingRows.addAll(rows); + int batchSize = definition.getBatchSize(); + while (pendingRows.size() >= batchSize) { + List> batchRows = new ArrayList>(pendingRows.subList(0, batchSize)); + int batchNo = nextBatchNo(batchNoMap, definition.getTableName()); + if (!flushBatch(taskId, definition, batchRows, batchNo)) { + return false; + } + pendingRows.subList(0, batchSize).clear(); + } + } + return true; + } + + /** + * 任务结束后刷新所有未满批次的尾批数据。 + * + * @param taskId 任务编号 + * @param pendingRowsByTable 跨窗口复用的待写入缓存 + * @param batchNoMap 每张表的批次号缓存 + * @return true 表示继续执行 + */ + private boolean flushRemainingBatchData(String taskId, Map>> pendingRowsByTable, + Map batchNoMap) { + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + List> pendingRows = pendingRowsByTable.get(definition.getTableName()); + if (pendingRows == null || pendingRows.isEmpty()) { + continue; + } + int batchNo = nextBatchNo(batchNoMap, definition.getTableName()); + List> batchRows = new ArrayList>(pendingRows); + if (!flushBatch(taskId, definition, batchRows, batchNo)) { + return false; + } + pendingRows.clear(); + } + return true; + } + + /** + * 获取当前表的下一个批次号。 + * + * @param batchNoMap 批次号缓存 + * @param tableName 表名 + * @return 下一个批次号 + */ + private int nextBatchNo(Map batchNoMap, String tableName) { + Integer currentBatchNo = batchNoMap.get(tableName); + int nextBatchNo = currentBatchNo == null ? 1 : currentBatchNo + 1; + batchNoMap.put(tableName, nextBatchNo); + return nextBatchNo; + } + + /** + * 刷新当前批次。 + * + * @param taskId 任务编号 + * @param definition 表定义 + * @param batchRows 批次行 + * @param batchNo 批次号 + * @return true 表示继续执行 + */ + private boolean flushBatch(String taskId, AddDataTableDefinition definition, List> batchRows, int batchNo) { + addDataTaskStatusHolder.updateCurrentBatch(taskId, definition.getTableName(), batchNo, batchRows.size()); + AddDataBatchWriteResult writeResult = addDataBatchWriter.writeBatch(definition, batchRows); + addDataTaskStatusHolder.addProgress(taskId, writeResult.getInsertedCount(), writeResult.getSkippedCount(), writeResult.getFailedCount()); + if (writeResult.hasFailure()) { + addDataTaskStatusHolder.markFailed(taskId, + "表 " + definition.getTableName() + " 第 " + batchNo + " 批执行失败:" + writeResult.getFirstFailureMessage()); + return false; + } + return true; + } + + /** + * 当前窗口生成结果。 + */ + private static final class GeneratedBatchData { + + /** 按表名分组的待写入行数据。 */ + private final Map>> rowsByTable; + + private GeneratedBatchData(Map>> rowsByTable) { + this.rowsByTable = rowsByTable; + } + + /** + * 获取指定表的待写入行。 + * + * @param tableName 表名 + * @return 待写入行列表 + */ + private List> getRows(String tableName) { + List> rows = rowsByTable.get(tableName); + return rows == null ? Collections.emptyList() : rows; + } + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTaskStatusHolder.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTaskStatusHolder.java new file mode 100644 index 0000000..7767e39 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTaskStatusHolder.java @@ -0,0 +1,240 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand; +import com.njcn.gather.tool.adddata.pojo.enums.AddDataTaskStatusEnum; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; +import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * 任务状态持有器。 + */ +@Component +public class AddDataTaskStatusHolder { + + /** 任务记录缓存。 */ + private final ConcurrentMap taskRecordMap = new ConcurrentHashMap(); + + /** 时间槽计算组件。 */ + private final AddDataTimeSlotCalculator addDataTimeSlotCalculator; + + public AddDataTaskStatusHolder(AddDataTimeSlotCalculator addDataTimeSlotCalculator) { + this.addDataTimeSlotCalculator = addDataTimeSlotCalculator; + } + + /** + * 创建等待中的任务。 + * + * @param command 任务命令 + * @return 任务状态快照 + */ + public AddDataTaskStatusVO createWaitingTask(AddDataTaskCommand command) { + String taskId = UUID.randomUUID().toString().replace("-", ""); + TaskRecord record = new TaskRecord(taskId, command, buildHourlyTimeResults(command)); + taskRecordMap.put(taskId, record); + return copySnapshot(record); + } + + /** + * 标记为运行中。 + * + * @param taskId 任务编号 + */ + public void markRunning(String taskId) { + TaskRecord record = requireRecord(taskId); + synchronized (record) { + record.status = AddDataTaskStatusEnum.RUNNING; + record.startTime = LocalDateTime.now(); + record.currentBatchInfo = "任务已启动"; + } + } + + /** + * 更新当前批次信息。 + * + * @param taskId 任务编号 + * @param tableName 当前表名 + * @param batchNo 批次号 + * @param batchSize 批次大小 + */ + public void updateCurrentBatch(String taskId, String tableName, int batchNo, int batchSize) { + TaskRecord record = requireRecord(taskId); + synchronized (record) { + record.currentTableName = tableName; + record.currentBatchInfo = "第 " + batchNo + " 批," + batchSize + " 行"; + } + } + + /** + * 累加执行统计。 + * + * @param taskId 任务编号 + * @param insertedCount 新增成功数 + * @param skippedCount 跳过数 + * @param failedCount 失败数 + */ + public void addProgress(String taskId, long insertedCount, long skippedCount, long failedCount) { + TaskRecord record = requireRecord(taskId); + synchronized (record) { + record.insertedCount += insertedCount; + record.skippedCount += skippedCount; + record.failedCount += failedCount; + } + } + + /** + * 标记任务成功。 + * + * @param taskId 任务编号 + */ + public void markSuccess(String taskId) { + TaskRecord record = requireRecord(taskId); + synchronized (record) { + record.status = AddDataTaskStatusEnum.SUCCESS; + record.endTime = LocalDateTime.now(); + record.currentBatchInfo = "执行完成"; + } + } + + /** + * 标记任务失败。 + * + * @param taskId 任务编号 + * @param failureReason 失败原因 + */ + public void markFailed(String taskId, String failureReason) { + TaskRecord record = requireRecord(taskId); + synchronized (record) { + record.status = AddDataTaskStatusEnum.FAILED; + record.endTime = LocalDateTime.now(); + record.failureReason = failureReason; + record.currentBatchInfo = "执行失败"; + } + } + + /** + * 查询任务状态。 + * + * @param taskId 任务编号 + * @return 状态快照 + */ + public AddDataTaskStatusVO getStatus(String taskId) { + return copySnapshot(requireRecord(taskId)); + } + + /** + * 获取任务命令。 + * + * @param taskId 任务编号 + * @return 任务命令 + */ + public AddDataTaskCommand getCommand(String taskId) { + return requireRecord(taskId).command; + } + + /** + * 获取全部任务编号。 + * + * @return 任务编号列表 + */ + public List getTaskIds() { + return new ArrayList(taskRecordMap.keySet()); + } + + /** + * 复制状态快照。 + * + * @param record 任务记录 + * @return 状态快照 + */ + private AddDataTaskStatusVO copySnapshot(TaskRecord record) { + synchronized (record) { + AddDataTaskStatusVO status = new AddDataTaskStatusVO(); + status.setTaskId(record.taskId); + status.setStatus(record.status.name()); + status.setCurrentTableName(record.currentTableName); + status.setCurrentBatchInfo(record.currentBatchInfo); + status.setInsertedCount(record.insertedCount); + status.setSkippedCount(record.skippedCount); + status.setFailedCount(record.failedCount); + status.setFailureReason(record.failureReason); + status.setHourlyTimeResults(new ArrayList(record.hourlyTimeResults)); + status.setStartTime(AddDataDateTimeUtil.format(record.startTime)); + status.setEndTime(AddDataDateTimeUtil.format(record.endTime)); + return status; + } + } + + /** + * 根据任务时间范围生成前端展示的每小时时刻。 + * + * @param command 任务命令 + * @return 每小时时刻文本 + */ + private List buildHourlyTimeResults(AddDataTaskCommand command) { + List timeSlots = addDataTimeSlotCalculator.buildHourlyTimeSlots(command.getStartTime(), command.getEndTime()); + List result = new ArrayList(timeSlots.size()); + for (LocalDateTime timeSlot : timeSlots) { + result.add(AddDataDateTimeUtil.format(timeSlot)); + } + return result; + } + + /** + * 获取任务记录。 + * + * @param taskId 任务编号 + * @return 任务记录 + */ + private TaskRecord requireRecord(String taskId) { + TaskRecord record = taskRecordMap.get(taskId); + if (record == null) { + throw new IllegalArgumentException("未找到补数任务:" + taskId); + } + return record; + } + + /** + * 内存态任务记录。 + */ + private static final class TaskRecord { + + /** 任务编号。 */ + private final String taskId; + /** 任务命令。 */ + private final AddDataTaskCommand command; + /** 每小时返回给前端展示的业务时刻。 */ + private final List hourlyTimeResults; + /** 当前状态。 */ + private AddDataTaskStatusEnum status = AddDataTaskStatusEnum.WAITING; + /** 当前表名。 */ + private String currentTableName; + /** 当前批次。 */ + private String currentBatchInfo = "等待执行"; + /** 成功数。 */ + private long insertedCount; + /** 跳过数。 */ + private long skippedCount; + /** 失败数。 */ + private long failedCount; + /** 失败原因。 */ + private String failureReason; + /** 开始时间。 */ + private LocalDateTime startTime; + /** 结束时间。 */ + private LocalDateTime endTime; + + private TaskRecord(String taskId, AddDataTaskCommand command, List hourlyTimeResults) { + this.taskId = taskId; + this.command = command; + this.hourlyTimeResults = hourlyTimeResults; + } + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTemplateRegistry.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTemplateRegistry.java new file mode 100644 index 0000000..ac0b855 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTemplateRegistry.java @@ -0,0 +1,97 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * 前端展示模板注册器。 + */ +@Component +public class AddDataTemplateRegistry { + + /** 模板列表。 */ + private final List templates; + + public AddDataTemplateRegistry() { + List result = new ArrayList(); + result.add(createTemplate("电压有效值", "data_v", "ABC", Arrays.asList("A", "B", "C"), + "RMS_MAX", "RMS_MIN", "主值字段 RMS", "RMS_CP95")); + result.add(createTemplate("线电压有效值", "data_v", "T", Arrays.asList("T"), + "RMSAB/RMSBC/RMSCA 对应 MAX 字段", "RMSAB/RMSBC/RMSCA 对应 MIN 字段", "主值字段 RMSAB/RMSBC/RMSCA", "RMSAB/RMSBC/RMSCA 对应 CP95 字段")); + result.add(createTemplate("频率", "data_v", "T", Arrays.asList("T"), + "FREQ_MAX", "FREQ_MIN", "主值字段 FREQ", "FREQ_CP95")); + result.add(createTemplate("电压总谐波畸变率", "data_v", "ABC", Arrays.asList("A", "B", "C"), + "V_THD_MAX", "V_THD_MIN", "主值字段 V_THD", "V_THD_CP95")); + result.add(createTemplate("电流有效值", "data_i", "ABC", Arrays.asList("A", "B", "C"), + "RMS_MAX", "RMS_MIN", "主值字段 RMS", "RMS_CP95")); + result.add(createTemplate("电流总谐波畸变率", "data_i", "ABC", Arrays.asList("A", "B", "C"), + "I_THD_MAX", "I_THD_MIN", "主值字段 I_THD", "I_THD_CP95")); + result.add(createTemplate("电压谐波幅值", "data_harmphasic_v", "ABC", Arrays.asList("A", "B", "C"), + "各次 V_n_MAX", "各次 V_n_MIN", "主值字段 V_n", "各次 V_n_CP95")); + result.add(createTemplate("电流谐波幅值", "data_harmphasic_i", "ABC", Arrays.asList("A", "B", "C"), + "各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95")); + result.add(createTemplate("电压谐波含有率", "data_harmrate_v", "ABC", Arrays.asList("A", "B", "C"), + "各次 V_n_MAX", "各次 V_n_MIN", "主值字段 V_n", "各次 V_n_CP95")); + result.add(createTemplate("电流谐波含有率", "data_harmrate_i", "ABC", Arrays.asList("A", "B", "C"), + "各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95")); + result.add(createTemplate("间谐波电流", "data_inharm_i", "ABC", Arrays.asList("A", "B", "C"), + "各次 I_n_MAX", "各次 I_n_MIN", "主值字段 I_n", "各次 I_n_CP95")); + result.add(createTemplate("有功谐波功率", "data_harmpower_p", "ABC", Arrays.asList("A", "B", "C"), + "P_MAX 与各次 P_n_MAX", "P_MIN 与各次 P_n_MIN", "主值字段 P / P_n", "P_CP95 与各次 P_n_CP95")); + result.add(createTemplate("无功谐波功率", "data_harmpower_q", "ABC", Arrays.asList("A", "B", "C"), + "Q_MAX 与各次 Q_n_MAX", "Q_MIN 与各次 Q_n_MIN", "主值字段 Q / Q_n", "Q_CP95 与各次 Q_n_CP95")); + result.add(createTemplate("视在谐波功率", "data_harmpower_s", "ABC", Arrays.asList("A", "B", "C"), + "S_MAX 与各次 S_n_MAX", "S_MIN 与各次 S_n_MIN", "主值字段 S / S_n", "S_CP95 与各次 S_n_CP95")); + result.add(createTemplate("电压波动", "data_fluc", "T", Arrays.asList("T"), + "当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 FLUC", "当前表无独立 CP95 字段")); + result.add(createTemplate("短时闪变", "data_flicker", "T", Arrays.asList("T"), + "当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 PST", "当前表无独立 CP95 字段")); + result.add(createTemplate("长时闪变", "data_plt", "T", Arrays.asList("T"), + "当前表无独立最大值字段", "当前表无独立最小值字段", "主值字段 PLT", "当前表无独立 CP95 字段")); + templates = Collections.unmodifiableList(result); + } + + /** + * 返回模板列表。 + * + * @return 模板列表 + */ + public List getTemplates() { + return templates; + } + + /** + * 创建模板对象。 + * + * @param parameterName 参数名称 + * @param tableName 表名 + * @param phaseDisplay 展示相别 + * @param phaseCodes 落库相别 + * @param maxRule 最大值规则 + * @param minRule 最小值规则 + * @param averageRule 平均值规则 + * @param cp95Rule cp95 规则 + * @return 模板对象 + */ + private AddDataTemplateVO createTemplate(String parameterName, String tableName, String phaseDisplay, List phaseCodes, + String maxRule, String minRule, String averageRule, String cp95Rule) { + AddDataTemplateVO template = new AddDataTemplateVO(); + template.setParameterName(parameterName); + template.setTableName(tableName); + template.setPhaseDisplay(phaseDisplay); + template.setPhaseCodes(new ArrayList(phaseCodes)); + template.setDisplay(true); + template.setShowQualified(true); + template.setMaxValueRule(maxRule); + template.setMinValueRule(minRule); + template.setAverageValueRule(averageRule); + template.setCp95ValueRule(cp95Rule); + template.setDecimalScale(2); + return template; + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTimeSlotCalculator.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTimeSlotCalculator.java new file mode 100644 index 0000000..933100b --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataTimeSlotCalculator.java @@ -0,0 +1,77 @@ +package com.njcn.gather.tool.adddata.component; + +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; + +/** + * 时间槽计算组件。 + */ +@Component +public class AddDataTimeSlotCalculator { + + /** + * 构建自然时间槽列表。 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param intervalMinutes 步长,单位分钟 + * @return 时间槽列表 + */ + public List buildTimeSlots(LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) { + if (intervalMinutes <= 0) { + throw new IllegalArgumentException("时间步长必须大于 0"); + } + List result = new ArrayList(); + LocalDateTime cursor = alignToNextSlot(startTime, intervalMinutes); + while (!cursor.isAfter(endTime)) { + result.add(cursor); + cursor = cursor.plusMinutes(intervalMinutes); + } + return result; + } + + /** + * 构建返回给前端展示的每小时自然时刻列表。 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + * @return 每小时自然时刻列表 + */ + public List buildHourlyTimeSlots(LocalDateTime startTime, LocalDateTime endTime) { + return buildTimeSlots(startTime, endTime, 60); + } + + /** + * 计算时间槽数量。 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param intervalMinutes 步长 + * @return 时间槽数量 + */ + public long countTimeSlots(LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) { + return buildTimeSlots(startTime, endTime, intervalMinutes).size(); + } + + /** + * 向上对齐到最近的自然时间槽。 + * + * @param time 原始时间 + * @param intervalMinutes 步长 + * @return 对齐后的时间 + */ + public LocalDateTime alignToNextSlot(LocalDateTime time, int intervalMinutes) { + LocalDateTime minuteFloor = time.truncatedTo(ChronoUnit.MINUTES); + int minuteOfDay = minuteFloor.getHour() * 60 + minuteFloor.getMinute(); + int remainder = minuteOfDay % intervalMinutes; + LocalDateTime aligned = minuteFloor.minusMinutes(remainder); + if (remainder != 0 || !minuteFloor.equals(time)) { + aligned = aligned.plusMinutes(intervalMinutes); + } + return aligned; + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataValueGenerator.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataValueGenerator.java new file mode 100644 index 0000000..415e20d --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataValueGenerator.java @@ -0,0 +1,579 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import org.springframework.stereotype.Component; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SplittableRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * 电能质量数据生成器。 + */ +@Component +public class AddDataValueGenerator { + + /** 谐波列匹配。 */ + private static final Pattern HARMONIC_PATTERN = Pattern.compile("^([VIQPS])_(\\d+)$"); + + /** 基础列后缀。 */ + private static final String SUFFIX_MAX = "_MAX"; + private static final String SUFFIX_MIN = "_MIN"; + private static final String SUFFIX_CP95 = "_CP95"; + + /** 两倍 PI。 */ + private static final double TWO_PI = Math.PI * 2D; + + /** + * 生成一行落库值。 + * + * @param definition 表定义 + * @param lineId 监测点 ID + * @param timeId 时间点 + * @param phaseCode 相别 + * @return 与字段顺序一致的值列表 + */ + public List generateRow(AddDataTableDefinition definition, String lineId, LocalDateTime timeId, String phaseCode) { + MeasurementState state = buildState(lineId, timeId, phaseCode); + Map baseValues = buildBaseValues(definition, state); + List row = new ArrayList(definition.getColumns().size()); + for (String column : definition.getColumns()) { + if ("TIMEID".equals(column)) { + row.add(Timestamp.valueOf(timeId)); + continue; + } + if ("LINEID".equals(column)) { + row.add(lineId); + continue; + } + if ("PHASIC_TYPE".equals(column)) { + row.add(phaseCode); + continue; + } + if ("QUALITYFLAG".equals(column)) { + row.add(1); + continue; + } + row.add(resolveColumnValue(definition.getTableName(), column, baseValues, state)); + } + return row; + } + + /** + * 按列名解析字段值。 + * + * @param tableName 表名 + * @param column 列名 + * @param baseValues 主值集合 + * @param state 基础状态 + * @return 对应值 + */ + private Double resolveColumnValue(String tableName, String column, Map baseValues, MeasurementState state) { + if (baseValues.containsKey(column)) { + return baseValues.get(column); + } + if (column.endsWith(SUFFIX_MAX)) { + return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_MAX)), column, state, MetricType.MAX); + } + if (column.endsWith(SUFFIX_MIN)) { + return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_MIN)), column, state, MetricType.MIN); + } + if (column.endsWith(SUFFIX_CP95)) { + return deriveMetric(baseValues.get(removeSuffix(column, SUFFIX_CP95)), column, state, MetricType.CP95); + } + throw new IllegalStateException("未支持的 add-data 列:" + tableName + "." + column); + } + + /** + * 生成主值列集合。 + * + * @param definition 表定义 + * @param state 基础状态 + * @return 主值集合 + */ + private Map buildBaseValues(AddDataTableDefinition definition, MeasurementState state) { + Map values = new HashMap(); + for (String column : definition.getColumns()) { + if (isInfrastructureColumn(column) || isDerivedColumn(column)) { + continue; + } + values.put(column, resolveBaseMetric(definition.getTableName(), column, state)); + } + return values; + } + + /** + * 解析基础主值。 + * + * @param tableName 表名 + * @param column 列名 + * @param state 基础状态 + * @return 主值 + */ + private Double resolveBaseMetric(String tableName, String column, MeasurementState state) { + Matcher matcher = HARMONIC_PATTERN.matcher(column); + if (matcher.matches()) { + String prefix = matcher.group(1); + int order = Integer.parseInt(matcher.group(2)); + if ("V".equals(prefix)) { + if ("data_harmrate_v".equals(tableName)) { + return resolveRatioPercent(state.vHarmonics[order], state.vHarmonics[1]); + } + return state.vHarmonics[order]; + } + if ("I".equals(prefix)) { + if ("data_harmrate_i".equals(tableName)) { + return resolveRatioPercent(state.iHarmonics[order], state.iHarmonics[1]); + } + if ("data_inharm_i".equals(tableName)) { + return state.iInharmonics[order]; + } + return state.iHarmonics[order]; + } + if ("P".equals(prefix)) { + return state.pHarmonics[order]; + } + if ("Q".equals(prefix)) { + return state.qHarmonics[order]; + } + if ("S".equals(prefix)) { + return state.sHarmonics[order]; + } + } + + switch (column) { + case "RMS": + return "data_v".equals(tableName) ? state.phaseVoltage : state.currentRms; + case "RMSAB": + return state.lineVoltageAB; + case "RMSBC": + return state.lineVoltageBC; + case "RMSCA": + return state.lineVoltageCA; + case "VU_DEV": + return state.vUpperDeviation; + case "VL_DEV": + return state.vLowerDeviation; + case "FREQ": + return state.frequency; + case "FREQ_DEV": + return state.frequencyDeviation; + case "V_UNBALANCE": + return state.vUnbalance; + case "V_POS": + return state.vPositive; + case "V_NEG": + return state.vNegative; + case "V_ZERO": + return state.vZero; + case "V_THD": + return state.vThd; + case "I_UNBALANCE": + return state.iUnbalance; + case "I_POS": + return state.iPositive; + case "I_NEG": + return state.iNegative; + case "I_ZERO": + return state.iZero; + case "I_THD": + return state.iThd; + case "PF": + return state.powerFactor; + case "DF": + return state.distortionFactor; + case "P": + return state.activePower; + case "Q": + return state.reactivePower; + case "S": + return state.apparentPower; + case "FLUC": + return state.fluc; + case "PST": + return state.pst; + case "PLT": + return state.plt; + case "FLUCCF": + return state.flucCf; + default: + throw new IllegalStateException("未支持的基础指标列:" + tableName + "." + column); + } + } + + /** + * 生成主值派生指标。 + * + * @param baseValue 主值 + * @param column 派生列名 + * @param state 基础状态 + * @param metricType 派生类型 + * @return 派生值 + */ + private Double deriveMetric(Double baseValue, String column, MeasurementState state, MetricType metricType) { + if (baseValue == null) { + throw new IllegalStateException("派生字段缺少主值:" + column); + } + double factor = noise(state.sharedSeed + column.hashCode(), 0.01D, 0.05D); + double delta = Math.max(Math.abs(baseValue) * factor, 0.005D); + double value; + if (MetricType.MAX.equals(metricType)) { + value = baseValue + delta; + } else if (MetricType.MIN.equals(metricType)) { + value = baseValue - delta; + if (baseValue >= 0D) { + value = Math.max(0D, value); + } + } else { + value = baseValue + delta * 0.6D; + } + return round(value, 4); + } + + /** + * 构建同源基础状态。 + * + * @param lineId 监测点 ID + * @param timeId 时间 + * @param phaseCode 相别 + * @return 基础状态 + */ + private MeasurementState buildState(String lineId, LocalDateTime timeId, String phaseCode) { + long sharedSeed = Objects.hash(lineId, timeId.getYear(), timeId.getDayOfYear(), timeId.getHour(), timeId.getMinute()); + long phaseSeed = Objects.hash(sharedSeed, phaseCode); + double minuteOfDay = timeId.getHour() * 60D + timeId.getMinute(); + double dayWave = Math.sin(minuteOfDay / 1440D * TWO_PI); + double fastWave = Math.cos(minuteOfDay / 240D * TWO_PI); + double phaseWave = Math.sin(minuteOfDay / 1440D * TWO_PI + resolvePhaseOffset(phaseCode)); + + double phaseVoltage = round(220D + dayWave * 3.6D + phaseWave * 1.5D + noise(phaseSeed + 11L, -0.9D, 0.9D), 4); + double lineVoltageAB = round(380D + dayWave * 4.8D + noise(sharedSeed + 21L, -1.2D, 1.2D), 4); + double lineVoltageBC = round(381D + fastWave * 3.6D + noise(sharedSeed + 22L, -1.2D, 1.2D), 4); + double lineVoltageCA = round(379D + Math.sin(minuteOfDay / 720D * TWO_PI) * 4.2D + noise(sharedSeed + 23L, -1.2D, 1.2D), 4); + double frequency = round(50D + Math.sin(minuteOfDay / 180D * TWO_PI) * 0.03D + noise(sharedSeed + 31L, -0.01D, 0.01D), 4); + double frequencyDeviation = round(frequency - 50D, 4); + + double currentRms = round(85D + fastWave * 12D + phaseWave * 6D + noise(phaseSeed + 41L, -2D, 2D), 4); + double vPositive = round(phaseVoltage * (0.992D + noise(sharedSeed + 51L, -0.003D, 0.003D)), 4); + double vNegative = round(Math.max(0D, phaseVoltage * (0.008D + noise(phaseSeed + 52L, 0D, 0.006D))), 4); + double vZero = round(Math.max(0D, phaseVoltage * (0.006D + noise(phaseSeed + 53L, 0D, 0.004D))), 4); + double vUnbalance = resolveRatioPercent(vNegative, vPositive); + double vUpperDeviation = round(Math.max((phaseVoltage - 220D) / 220D * 100D, 0D), 4); + double vLowerDeviation = round(Math.max((220D - phaseVoltage) / 220D * 100D, 0D), 4); + + double iPositive = round(currentRms * (0.986D + noise(sharedSeed + 61L, -0.003D, 0.003D)), 4); + double iNegative = round(Math.max(0D, currentRms * (0.016D + noise(phaseSeed + 62L, 0D, 0.01D))), 4); + double iZero = round(Math.max(0D, currentRms * (0.011D + noise(phaseSeed + 63L, 0D, 0.008D))), 4); + double iUnbalance = resolveRatioPercent(iNegative, iPositive); + + double[] vHarmonics = new double[51]; + double[] iHarmonics = new double[51]; + double[] iInharmonics = new double[51]; + vHarmonics[1] = phaseVoltage; + iHarmonics[1] = currentRms; + iInharmonics[1] = round(currentRms * 0.06D, 4); + for (int order = 2; order <= 50; order++) { + double voltageRatio = (0.015D / order) * (1D + noise(sharedSeed + order, -0.35D, 0.35D)); + double currentRatio = (0.08D / Math.sqrt(order)) * (1D + noise(phaseSeed + order, -0.4D, 0.4D)); + vHarmonics[order] = round(Math.max(0D, phaseVoltage * voltageRatio), 4); + iHarmonics[order] = round(Math.max(0D, currentRms * currentRatio), 4); + iInharmonics[order] = round(Math.max(0D, iHarmonics[order] * noise(phaseSeed + order * 13L, 0.08D, 0.18D)), 4); + } + + double vThd = round(resolveThd(vHarmonics), 4); + double iThd = round(resolveThd(iHarmonics), 4); + + double[] pHarmonics = new double[51]; + double[] qHarmonics = new double[51]; + double[] sHarmonics = new double[51]; + double activePower = 0D; + double reactivePower = 0D; + double apparentPower = 0D; + for (int order = 1; order <= 50; order++) { + double apparent = Math.max(0D, round(vHarmonics[order] * iHarmonics[order] / 1000D, 4)); + double angle = 0.16D + order * 0.008D + noise(sharedSeed + order * 17L, -0.03D, 0.03D); + double active = round(apparent * Math.cos(angle), 4); + double reactive = round(apparent * Math.sin(angle), 4); + pHarmonics[order] = active; + qHarmonics[order] = reactive; + sHarmonics[order] = round(Math.sqrt(active * active + reactive * reactive), 4); + activePower += active; + reactivePower += reactive; + apparentPower += sHarmonics[order]; + } + activePower = round(activePower, 4); + reactivePower = round(reactivePower, 4); + apparentPower = round(apparentPower, 4); + double powerFactor = apparentPower == 0D ? 1D : round(clamp(activePower / apparentPower, -1D, 1D), 4); + double distortionFactor = round(1D / Math.sqrt(1D + Math.pow(vThd / 100D, 2D) + Math.pow(iThd / 100D, 2D)), 4); + + double fluc = round(0.35D + Math.abs(dayWave) * 0.18D + noise(sharedSeed + 71L, -0.03D, 0.03D), 4); + double pst = round(fluc * 1.12D + 0.08D + noise(sharedSeed + 72L, -0.02D, 0.02D), 4); + double plt = round(Math.max(0.01D, pst * 0.92D + 0.05D + noise(sharedSeed + 73L, -0.02D, 0.02D)), 4); + double flucCf = round(fluc * (0.96D + noise(sharedSeed + 74L, -0.03D, 0.03D)), 4); + + return new MeasurementState(sharedSeed, phaseVoltage, lineVoltageAB, lineVoltageBC, lineVoltageCA, + frequency, frequencyDeviation, currentRms, vPositive, vNegative, vZero, vUnbalance, + vUpperDeviation, vLowerDeviation, iPositive, iNegative, iZero, iUnbalance, + vHarmonics, iHarmonics, iInharmonics, vThd, iThd, pHarmonics, qHarmonics, sHarmonics, + activePower, reactivePower, apparentPower, powerFactor, distortionFactor, fluc, pst, plt, flucCf); + } + + /** + * 解析百分比。 + * + * @param part 分子 + * @param total 分母 + * @return 百分比 + */ + private double resolveRatioPercent(double part, double total) { + if (total == 0D) { + return 0D; + } + return round(part / total * 100D, 4); + } + + /** + * 解析 THD。 + * + * @param harmonics 谐波数组 + * @return thd 百分比 + */ + private double resolveThd(double[] harmonics) { + if (harmonics[1] == 0D) { + return 0D; + } + double sum = 0D; + for (int order = 2; order < harmonics.length; order++) { + sum += harmonics[order] * harmonics[order]; + } + return Math.sqrt(sum) / harmonics[1] * 100D; + } + + /** + * 解析相别偏移角。 + * + * @param phaseCode 相别 + * @return 偏移角 + */ + private double resolvePhaseOffset(String phaseCode) { + if ("B".equals(phaseCode)) { + return TWO_PI / 3D; + } + if ("C".equals(phaseCode)) { + return TWO_PI / 3D * 2D; + } + return 0D; + } + + /** + * 生成稳定随机扰动。 + * + * @param seed 种子 + * @param min 最小值 + * @param max 最大值 + * @return 扰动值 + */ + private double noise(long seed, double min, double max) { + return min + (max - min) * new SplittableRandom(seed).nextDouble(); + } + + /** + * 截断小数位。 + * + * @param value 原始值 + * @param scale 保留位数 + * @return 处理后的值 + */ + private double round(double value, int scale) { + if (Math.abs(value) < 0.0000001D) { + return 0D; + } + return BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_UP).doubleValue(); + } + + /** + * 移除列后缀。 + * + * @param column 列名 + * @param suffix 后缀 + * @return 主值列名 + */ + private String removeSuffix(String column, String suffix) { + return column.substring(0, column.length() - suffix.length()); + } + + /** + * 判断是否为系统字段。 + * + * @param column 列名 + * @return true 表示系统字段 + */ + private boolean isInfrastructureColumn(String column) { + return "TIMEID".equals(column) || "LINEID".equals(column) || "PHASIC_TYPE".equals(column) || "QUALITYFLAG".equals(column); + } + + /** + * 判断是否为派生列。 + * + * @param column 列名 + * @return true 表示派生列 + */ + private boolean isDerivedColumn(String column) { + return column.endsWith(SUFFIX_MAX) || column.endsWith(SUFFIX_MIN) || column.endsWith(SUFFIX_CP95); + } + + /** + * 数值类型。 + */ + private enum MetricType { + /** 最大值。 */ + MAX, + /** 最小值。 */ + MIN, + /** cp95。 */ + CP95 + } + + /** + * 数值约束。 + * + * @param value 值 + * @param min 最小值 + * @param max 最大值 + * @return 限幅结果 + */ + private double clamp(double value, double min, double max) { + return Math.max(min, Math.min(max, value)); + } + + /** + * 同源基础状态。 + */ + private static final class MeasurementState { + + /** 共享种子。 */ + private final long sharedSeed; + /** 电压有效值。 */ + private final double phaseVoltage; + /** AB 线电压。 */ + private final double lineVoltageAB; + /** BC 线电压。 */ + private final double lineVoltageBC; + /** CA 线电压。 */ + private final double lineVoltageCA; + /** 频率。 */ + private final double frequency; + /** 频偏。 */ + private final double frequencyDeviation; + /** 电流有效值。 */ + private final double currentRms; + /** 电压正序。 */ + private final double vPositive; + /** 电压负序。 */ + private final double vNegative; + /** 电压零序。 */ + private final double vZero; + /** 电压不平衡度。 */ + private final double vUnbalance; + /** 电压上偏差。 */ + private final double vUpperDeviation; + /** 电压下偏差。 */ + private final double vLowerDeviation; + /** 电流正序。 */ + private final double iPositive; + /** 电流负序。 */ + private final double iNegative; + /** 电流零序。 */ + private final double iZero; + /** 电流不平衡度。 */ + private final double iUnbalance; + /** 电压谐波。 */ + private final double[] vHarmonics; + /** 电流谐波。 */ + private final double[] iHarmonics; + /** 间谐波电流。 */ + private final double[] iInharmonics; + /** 电压 thd。 */ + private final double vThd; + /** 电流 thd。 */ + private final double iThd; + /** 有功谐波功率。 */ + private final double[] pHarmonics; + /** 无功谐波功率。 */ + private final double[] qHarmonics; + /** 视在谐波功率。 */ + private final double[] sHarmonics; + /** 总有功。 */ + private final double activePower; + /** 总无功。 */ + private final double reactivePower; + /** 总视在。 */ + private final double apparentPower; + /** 功率因数。 */ + private final double powerFactor; + /** 畸变因数。 */ + private final double distortionFactor; + /** 波动值。 */ + private final double fluc; + /** 短时闪变。 */ + private final double pst; + /** 长时闪变。 */ + private final double plt; + /** 波动修正值。 */ + private final double flucCf; + + private MeasurementState(long sharedSeed, double phaseVoltage, double lineVoltageAB, double lineVoltageBC, double lineVoltageCA, + double frequency, double frequencyDeviation, double currentRms, double vPositive, double vNegative, + double vZero, double vUnbalance, double vUpperDeviation, double vLowerDeviation, double iPositive, + double iNegative, double iZero, double iUnbalance, double[] vHarmonics, double[] iHarmonics, + double[] iInharmonics, double vThd, double iThd, double[] pHarmonics, double[] qHarmonics, + double[] sHarmonics, double activePower, double reactivePower, double apparentPower, + double powerFactor, double distortionFactor, double fluc, double pst, double plt, double flucCf) { + this.sharedSeed = sharedSeed; + this.phaseVoltage = phaseVoltage; + this.lineVoltageAB = lineVoltageAB; + this.lineVoltageBC = lineVoltageBC; + this.lineVoltageCA = lineVoltageCA; + this.frequency = frequency; + this.frequencyDeviation = frequencyDeviation; + this.currentRms = currentRms; + this.vPositive = vPositive; + this.vNegative = vNegative; + this.vZero = vZero; + this.vUnbalance = vUnbalance; + this.vUpperDeviation = vUpperDeviation; + this.vLowerDeviation = vLowerDeviation; + this.iPositive = iPositive; + this.iNegative = iNegative; + this.iZero = iZero; + this.iUnbalance = iUnbalance; + this.vHarmonics = vHarmonics; + this.iHarmonics = iHarmonics; + this.iInharmonics = iInharmonics; + this.vThd = vThd; + this.iThd = iThd; + this.pHarmonics = pHarmonics; + this.qHarmonics = qHarmonics; + this.sHarmonics = sHarmonics; + this.activePower = activePower; + this.reactivePower = reactivePower; + this.apparentPower = apparentPower; + this.powerFactor = powerFactor; + this.distortionFactor = distortionFactor; + this.fluc = fluc; + this.pst = pst; + this.plt = plt; + this.flucCf = flucCf; + } + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/config/AddDataExecutorConfig.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/config/AddDataExecutorConfig.java new file mode 100644 index 0000000..f615e6c --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/config/AddDataExecutorConfig.java @@ -0,0 +1,37 @@ +package com.njcn.gather.tool.adddata.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * add-data 模块任务线程池配置。 + */ +@Slf4j +@Configuration +public class AddDataExecutorConfig { + + @Bean(name = "addDataTaskExecutorService", destroyMethod = "shutdown") + public ExecutorService addDataTaskExecutorService() { + AtomicInteger threadIndex = new AtomicInteger(1); + return new ThreadPoolExecutor( + 1, + 1, + 30, + TimeUnit.SECONDS, + new LinkedBlockingQueue(8), + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("add-data-task-" + threadIndex.getAndIncrement()); + return thread; + }, + (runnable, executor) -> log.warn("数据补录任务线程池已满,拒绝新的补数任务") + ); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataTaskController.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataTaskController.java new file mode 100644 index 0000000..0f6b7ff --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataTaskController.java @@ -0,0 +1,90 @@ +package com.njcn.gather.tool.adddata.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.LogUtil; +import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; +import com.njcn.gather.tool.adddata.service.AddDataTaskService; +import com.njcn.web.controller.BaseController; +import com.njcn.web.utils.HttpResultUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 数据补录任务接口。 + */ +@Validated +@Slf4j +@Api(tags = "数据补录任务") +@RestController +@RequestMapping("/addData/task") +@RequiredArgsConstructor +public class AddDataTaskController extends BaseController { + + /** 数据补录任务服务。 */ + private final AddDataTaskService addDataTaskService; + + /** + * 预估本次补数规模。 + * + * @param param 补数参数 + * @return 预估结果 + */ + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("预估电能质量批量补数规模") + @PostMapping("/preview") + public HttpResult preview(@RequestBody @Validated AddDataTaskRequestParam param) { + String methodDescribe = getMethodDescribe("preview"); + LogUtil.njcnDebug(log, "{},开始预估补数规模,lineCount={}, intervalMinutes={}", + methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes()); + AddDataPreviewVO result = addDataTaskService.preview(param); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); + } + + /** + * 创建后台补数任务。 + * + * @param param 补数参数 + * @return 任务编号 + */ + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("创建电能质量批量补数任务") + @PostMapping("/create") + public HttpResult create(@RequestBody @Validated AddDataTaskRequestParam param) { + String methodDescribe = getMethodDescribe("create"); + LogUtil.njcnDebug(log, "{},开始创建补数任务,lineCount={}, intervalMinutes={}", + methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes()); + AddDataTaskCreateVO result = addDataTaskService.create(param); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); + } + + /** + * 查询任务状态。 + * + * @param taskId 任务编号 + * @return 当前任务状态 + */ + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("查询电能质量批量补数任务状态") + @GetMapping("/status/{taskId}") + public HttpResult status(@PathVariable("taskId") String taskId) { + String methodDescribe = getMethodDescribe("status"); + LogUtil.njcnDebug(log, "{},开始查询补数任务状态,taskId={}", methodDescribe, taskId); + AddDataTaskStatusVO result = addDataTaskService.getStatus(taskId); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataTemplateController.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataTemplateController.java new file mode 100644 index 0000000..632da77 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataTemplateController.java @@ -0,0 +1,49 @@ +package com.njcn.gather.tool.adddata.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.LogUtil; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO; +import com.njcn.gather.tool.adddata.service.AddDataTemplateService; +import com.njcn.web.controller.BaseController; +import com.njcn.web.utils.HttpResultUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * 数据补录模板展示接口。 + */ +@Slf4j +@Api(tags = "数据补录模板") +@RestController +@RequestMapping("/addData/template") +@RequiredArgsConstructor +public class AddDataTemplateController extends BaseController { + + /** 数据补录模板服务。 */ + private final AddDataTemplateService addDataTemplateService; + + /** + * 返回前端模板展示规则。 + * + * @return 模板列表 + */ + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("查询数据补录模板规则") + @GetMapping("/list") + public HttpResult> list() { + String methodDescribe = getMethodDescribe("list"); + LogUtil.njcnDebug(log, "{},开始查询数据补录模板规则", methodDescribe); + List result = addDataTemplateService.listTemplates(); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataBatchWriteResult.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataBatchWriteResult.java new file mode 100644 index 0000000..30f440e --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataBatchWriteResult.java @@ -0,0 +1,38 @@ +package com.njcn.gather.tool.adddata.pojo.bo; + +import lombok.Getter; + +/** + * 批量写入结果。 + */ +@Getter +public class AddDataBatchWriteResult { + + /** 成功插入条数。 */ + private final long insertedCount; + + /** 因主键重复被跳过的条数。 */ + private final long skippedCount; + + /** 非主键冲突失败条数。 */ + private final long failedCount; + + /** 第一条失败原因。 */ + private final String firstFailureMessage; + + public AddDataBatchWriteResult(long insertedCount, long skippedCount, long failedCount, String firstFailureMessage) { + this.insertedCount = insertedCount; + this.skippedCount = skippedCount; + this.failedCount = failedCount; + this.firstFailureMessage = firstFailureMessage; + } + + /** + * 当前批次是否存在失败。 + * + * @return true 表示存在失败 + */ + public boolean hasFailure() { + return failedCount > 0; + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataTableDefinition.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataTableDefinition.java new file mode 100644 index 0000000..07255f8 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataTableDefinition.java @@ -0,0 +1,64 @@ +package com.njcn.gather.tool.adddata.pojo.bo; + +import lombok.Getter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * 数据补录表定义。 + */ +@Getter +public class AddDataTableDefinition { + + /** 分表时间轴类型。 */ + public enum TimeAxisType { + /** 使用前端传入步长。 */ + REQUEST_INTERVAL, + /** 固定 10 分钟。 */ + FIXED_TEN_MINUTES, + /** 固定 2 小时。 */ + FIXED_TWO_HOURS + } + + /** 表名。 */ + private final String tableName; + + /** 字段列表,顺序与落库 SQL 保持一致。 */ + private final List columns; + + /** 首版实际落库相别集合。 */ + private final List phaseCodes; + + /** 单批写入大小。 */ + private final int batchSize; + + /** 时间轴类型。 */ + private final TimeAxisType timeAxisType; + + public AddDataTableDefinition(String tableName, List columns, List phaseCodes, + int batchSize, TimeAxisType timeAxisType) { + this.tableName = tableName; + this.columns = Collections.unmodifiableList(new ArrayList(columns)); + this.phaseCodes = Collections.unmodifiableList(new ArrayList(phaseCodes)); + this.batchSize = batchSize; + this.timeAxisType = timeAxisType; + } + + /** + * 解析当前表真实使用的时间步长。 + * + * @param requestIntervalMinutes 前端传入步长 + * @return 当前表步长 + */ + public int resolveIntervalMinutes(int requestIntervalMinutes) { + if (TimeAxisType.FIXED_TEN_MINUTES.equals(timeAxisType)) { + return 10; + } + if (TimeAxisType.FIXED_TWO_HOURS.equals(timeAxisType)) { + return 120; + } + return requestIntervalMinutes; + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataTaskCommand.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataTaskCommand.java new file mode 100644 index 0000000..55d3e40 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataTaskCommand.java @@ -0,0 +1,34 @@ +package com.njcn.gather.tool.adddata.pojo.bo; + +import lombok.Getter; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * 归一化后的补数任务命令。 + */ +@Getter +public class AddDataTaskCommand { + + /** 监测点 ID 列表。 */ + private final List lineIds; + + /** 开始时间。 */ + private final LocalDateTime startTime; + + /** 结束时间。 */ + private final LocalDateTime endTime; + + /** 用户步长。 */ + private final int intervalMinutes; + + public AddDataTaskCommand(List lineIds, LocalDateTime startTime, LocalDateTime endTime, int intervalMinutes) { + this.lineIds = Collections.unmodifiableList(new ArrayList(lineIds)); + this.startTime = startTime; + this.endTime = endTime; + this.intervalMinutes = intervalMinutes; + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/enums/AddDataTaskStatusEnum.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/enums/AddDataTaskStatusEnum.java new file mode 100644 index 0000000..440baeb --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/enums/AddDataTaskStatusEnum.java @@ -0,0 +1,15 @@ +package com.njcn.gather.tool.adddata.pojo.enums; + +/** + * 数据补录任务状态。 + */ +public enum AddDataTaskStatusEnum { + /** 等待执行。 */ + WAITING, + /** 执行中。 */ + RUNNING, + /** 执行成功。 */ + SUCCESS, + /** 执行失败。 */ + FAILED +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/param/AddDataTaskRequestParam.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/param/AddDataTaskRequestParam.java new file mode 100644 index 0000000..3bfe88b --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/param/AddDataTaskRequestParam.java @@ -0,0 +1,35 @@ +package com.njcn.gather.tool.adddata.pojo.param; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; + +/** + * 数据补录任务请求参数。 + */ +@Data +@ApiModel("数据补录任务请求参数") +public class AddDataTaskRequestParam { + + @ApiModelProperty(value = "监测点 ID 列表", required = true) + @NotEmpty(message = "监测点 ID 列表不能为空") + private List lineIds = new ArrayList(); + + @ApiModelProperty(value = "开始时间,格式 yyyy-MM-dd HH:mm:ss", required = true) + @NotBlank(message = "开始时间不能为空") + private String startTime; + + @ApiModelProperty(value = "结束时间,格式 yyyy-MM-dd HH:mm:ss", required = true) + @NotBlank(message = "结束时间不能为空") + private String endTime; + + @ApiModelProperty(value = "用户步长,仅支持 1/3/5/10 分钟", required = true) + @NotNull(message = "时间步长不能为空") + private Integer intervalMinutes; +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataPreviewTableVO.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataPreviewTableVO.java new file mode 100644 index 0000000..cf7318e --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataPreviewTableVO.java @@ -0,0 +1,22 @@ +package com.njcn.gather.tool.adddata.pojo.vo; + +import lombok.Data; + +/** + * 单表预估结果。 + */ +@Data +public class AddDataPreviewTableVO { + + /** 表名。 */ + private String tableName; + + /** 当前表命中的时间点数量。 */ + private long timePointCount; + + /** 当前表实际展开的落库相别数量。 */ + private int phaseCount; + + /** 当前表预估写入条数。 */ + private long rowCount; +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataPreviewVO.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataPreviewVO.java new file mode 100644 index 0000000..ed47e55 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataPreviewVO.java @@ -0,0 +1,25 @@ +package com.njcn.gather.tool.adddata.pojo.vo; + +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; + +/** + * 补数规模预估结果。 + */ +@Data +public class AddDataPreviewVO { + + /** 监测点数量。 */ + private int lineCount; + + /** 用户步长。 */ + private int intervalMinutes; + + /** 总预估条数。 */ + private long totalRowCount; + + /** 单表预估详情。 */ + private List tableStats = new ArrayList(); +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTaskCreateVO.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTaskCreateVO.java new file mode 100644 index 0000000..c079385 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTaskCreateVO.java @@ -0,0 +1,16 @@ +package com.njcn.gather.tool.adddata.pojo.vo; + +import lombok.Data; + +/** + * 创建任务返回值。 + */ +@Data +public class AddDataTaskCreateVO { + + /** 任务编号。 */ + private String taskId; + + /** 初始状态。 */ + private String status; +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTaskStatusVO.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTaskStatusVO.java new file mode 100644 index 0000000..6a7c1ae --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTaskStatusVO.java @@ -0,0 +1,46 @@ +package com.njcn.gather.tool.adddata.pojo.vo; + +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; + +/** + * 数据补录任务状态。 + */ +@Data +public class AddDataTaskStatusVO { + + /** 任务编号。 */ + private String taskId; + + /** 当前状态。 */ + private String status; + + /** 当前执行表名。 */ + private String currentTableName; + + /** 当前批次描述。 */ + private String currentBatchInfo; + + /** 成功写入数量。 */ + private long insertedCount; + + /** 主键重复跳过数量。 */ + private long skippedCount; + + /** 非主键冲突失败数量。 */ + private long failedCount; + + /** 失败原因。 */ + private String failureReason; + + /** 每小时返回给前端展示的业务时刻。 */ + private List hourlyTimeResults = new ArrayList(); + + /** 任务开始时间。 */ + private String startTime; + + /** 任务结束时间。 */ + private String endTime; +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTemplateVO.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTemplateVO.java new file mode 100644 index 0000000..c95c765 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataTemplateVO.java @@ -0,0 +1,46 @@ +package com.njcn.gather.tool.adddata.pojo.vo; + +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; + +/** + * 前端展示模板规则。 + */ +@Data +public class AddDataTemplateVO { + + /** 电能质量参数名称。 */ + private String parameterName; + + /** 关联表名。 */ + private String tableName; + + /** 前端展示相别。 */ + private String phaseDisplay; + + /** 实际落库相别集合。 */ + private List phaseCodes = new ArrayList(); + + /** 当前模板是否展示。 */ + private boolean display; + + /** 是否展示是否合格列。 */ + private boolean showQualified; + + /** 最大值规则。 */ + private String maxValueRule; + + /** 最小值规则。 */ + private String minValueRule; + + /** 平均值规则。 */ + private String averageValueRule; + + /** 95% 概率大值规则。 */ + private String cp95ValueRule; + + /** 默认保留小数位数。 */ + private int decimalScale; +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataTaskService.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataTaskService.java new file mode 100644 index 0000000..dce9782 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataTaskService.java @@ -0,0 +1,36 @@ +package com.njcn.gather.tool.adddata.service; + +import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; + +/** + * 数据补录任务服务。 + */ +public interface AddDataTaskService { + + /** + * 预估补数规模。 + * + * @param param 补数参数 + * @return 预估结果 + */ + AddDataPreviewVO preview(AddDataTaskRequestParam param); + + /** + * 创建补数任务。 + * + * @param param 补数参数 + * @return 任务创建结果 + */ + AddDataTaskCreateVO create(AddDataTaskRequestParam param); + + /** + * 查询补数任务状态。 + * + * @param taskId 任务编号 + * @return 当前任务状态 + */ + AddDataTaskStatusVO getStatus(String taskId); +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataTemplateService.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataTemplateService.java new file mode 100644 index 0000000..a50ee75 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataTemplateService.java @@ -0,0 +1,18 @@ +package com.njcn.gather.tool.adddata.service; + +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO; + +import java.util.List; + +/** + * 数据补录模板服务。 + */ +public interface AddDataTemplateService { + + /** + * 返回前端参数模板。 + * + * @return 模板列表 + */ + List listTemplates(); +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataTaskServiceImpl.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataTaskServiceImpl.java new file mode 100644 index 0000000..bcef345 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataTaskServiceImpl.java @@ -0,0 +1,142 @@ +package com.njcn.gather.tool.adddata.service.impl; + +import com.njcn.gather.tool.adddata.component.AddDataTableRegistry; +import com.njcn.gather.tool.adddata.component.AddDataTaskExecutor; +import com.njcn.gather.tool.adddata.component.AddDataTaskStatusHolder; +import com.njcn.gather.tool.adddata.component.AddDataTimeSlotCalculator; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand; +import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewTableVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataPreviewVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; +import com.njcn.gather.tool.adddata.service.AddDataTaskService; +import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * 数据补录任务服务实现。 + */ +@Service +@RequiredArgsConstructor +public class AddDataTaskServiceImpl implements AddDataTaskService { + + /** 支持的用户步长。 */ + private static final Set SUPPORTED_INTERVALS = new LinkedHashSet(Arrays.asList(1, 3, 5, 10)); + + /** 表定义注册器。 */ + private final AddDataTableRegistry addDataTableRegistry; + + /** 时间槽计算器。 */ + private final AddDataTimeSlotCalculator addDataTimeSlotCalculator; + + /** 任务状态持有器。 */ + private final AddDataTaskStatusHolder addDataTaskStatusHolder; + + /** 后台执行器。 */ + private final AddDataTaskExecutor addDataTaskExecutor; + + @Override + public AddDataPreviewVO preview(AddDataTaskRequestParam param) { + AddDataTaskCommand command = buildCommand(param); + AddDataPreviewVO result = new AddDataPreviewVO(); + result.setLineCount(command.getLineIds().size()); + result.setIntervalMinutes(command.getIntervalMinutes()); + long totalRowCount = 0L; + List tableStats = new ArrayList(); + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + long timePointCount = addDataTimeSlotCalculator.countTimeSlots( + command.getStartTime(), command.getEndTime(), definition.resolveIntervalMinutes(command.getIntervalMinutes())); + long rowCount = timePointCount * command.getLineIds().size() * definition.getPhaseCodes().size(); + AddDataPreviewTableVO tableVO = new AddDataPreviewTableVO(); + tableVO.setTableName(definition.getTableName()); + tableVO.setTimePointCount(timePointCount); + tableVO.setPhaseCount(definition.getPhaseCodes().size()); + tableVO.setRowCount(rowCount); + tableStats.add(tableVO); + totalRowCount += rowCount; + } + result.setTableStats(tableStats); + result.setTotalRowCount(totalRowCount); + return result; + } + + @Override + public AddDataTaskCreateVO create(AddDataTaskRequestParam param) { + AddDataTaskCommand command = buildCommand(param); + AddDataTaskStatusVO snapshot = addDataTaskStatusHolder.createWaitingTask(command); + addDataTaskExecutor.submit(snapshot.getTaskId(), command); + AddDataTaskCreateVO result = new AddDataTaskCreateVO(); + result.setTaskId(snapshot.getTaskId()); + result.setStatus(snapshot.getStatus()); + return result; + } + + @Override + public AddDataTaskStatusVO getStatus(String taskId) { + if (taskId == null || taskId.trim().isEmpty()) { + throw new IllegalArgumentException("任务编号不能为空"); + } + return addDataTaskStatusHolder.getStatus(taskId.trim()); + } + + /** + * 归一化任务命令。 + * + * @param param 请求参数 + * @return 任务命令 + */ + private AddDataTaskCommand buildCommand(AddDataTaskRequestParam param) { + if (param == null) { + throw new IllegalArgumentException("补数参数不能为空"); + } + Integer intervalMinutes = param.getIntervalMinutes(); + if (intervalMinutes == null || !SUPPORTED_INTERVALS.contains(intervalMinutes)) { + throw new IllegalArgumentException("时间步长仅支持 1、3、5、10 分钟"); + } + List lineIds = normalizeLineIds(param.getLineIds()); + LocalDateTime startTime = AddDataDateTimeUtil.parse(param.getStartTime()); + LocalDateTime endTime = AddDataDateTimeUtil.parse(param.getEndTime()); + if (startTime.isAfter(endTime)) { + throw new IllegalArgumentException("开始时间不能大于结束时间"); + } + return new AddDataTaskCommand(lineIds, startTime, endTime, intervalMinutes); + } + + /** + * 标准化监测点列表。 + * + * @param lineIds 原始监测点列表 + * @return 去重后的监测点列表 + */ + private List normalizeLineIds(List lineIds) { + if (lineIds == null || lineIds.isEmpty()) { + throw new IllegalArgumentException("监测点 ID 列表不能为空"); + } + LinkedHashSet normalized = new LinkedHashSet(); + for (String lineId : lineIds) { + if (lineId == null) { + throw new IllegalArgumentException("监测点 ID 不能为空"); + } + String normalizedLineId = lineId.trim(); + // LINEID 已改为 char(32),这里同步限制字符串长度,避免运行时写入越界。 + if (normalizedLineId.isEmpty()) { + throw new IllegalArgumentException("监测点 ID 不能为空"); + } + if (normalizedLineId.length() > 32) { + throw new IllegalArgumentException("监测点 ID 长度不能超过 32 位"); + } + normalized.add(normalizedLineId); + } + return new ArrayList(normalized); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataTemplateServiceImpl.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataTemplateServiceImpl.java new file mode 100644 index 0000000..f03a567 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataTemplateServiceImpl.java @@ -0,0 +1,25 @@ +package com.njcn.gather.tool.adddata.service.impl; + +import com.njcn.gather.tool.adddata.component.AddDataTemplateRegistry; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO; +import com.njcn.gather.tool.adddata.service.AddDataTemplateService; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * 数据补录模板服务实现。 + */ +@Service +@RequiredArgsConstructor +public class AddDataTemplateServiceImpl implements AddDataTemplateService { + + /** 模板注册器。 */ + private final AddDataTemplateRegistry addDataTemplateRegistry; + + @Override + public List listTemplates() { + return addDataTemplateRegistry.getTemplates(); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/util/AddDataDateTimeUtil.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/util/AddDataDateTimeUtil.java new file mode 100644 index 0000000..bd3caaa --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/util/AddDataDateTimeUtil.java @@ -0,0 +1,51 @@ +package com.njcn.gather.tool.adddata.util; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + +/** + * add-data 模块时间工具。 + */ +public final class AddDataDateTimeUtil { + + /** 统一返回格式。 */ + private static final DateTimeFormatter OUTPUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** 允许的入参时间格式。 */ + private static final DateTimeFormatter[] INPUT_FORMATTERS = new DateTimeFormatter[]{ + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm") + }; + + private AddDataDateTimeUtil() { + } + + /** + * 解析请求时间。 + * + * @param timeText 时间文本 + * @return 解析后的时间 + */ + public static LocalDateTime parse(String timeText) { + for (DateTimeFormatter formatter : INPUT_FORMATTERS) { + try { + return LocalDateTime.parse(timeText, formatter); + } catch (DateTimeParseException ignored) { + } + } + throw new IllegalArgumentException("时间格式不正确,仅支持 yyyy-MM-dd HH:mm:ss 或 yyyy-MM-dd'T'HH:mm:ss"); + } + + /** + * 格式化时间。 + * + * @param time 时间对象 + * @return 格式化结果 + */ + public static String format(LocalDateTime time) { + return time == null ? null : time.format(OUTPUT_FORMATTER); + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTableRegistryTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTableRegistryTest.java new file mode 100644 index 0000000..75680c4 --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTableRegistryTest.java @@ -0,0 +1,28 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +/** + * 表定义注册测试。 + */ +class AddDataTableRegistryTest { + + @Test + void shouldLoadAllThirteenTablesFromSchema() throws Exception { + AddDataTableRegistry registry = new AddDataTableRegistry(); + registry.afterPropertiesSet(); + + List definitions = registry.getTableDefinitions(); + + Assertions.assertEquals(13, definitions.size()); + Assertions.assertEquals("data_flicker", definitions.get(0).getTableName()); + Assertions.assertEquals("data_v", definitions.get(definitions.size() - 1).getTableName()); + Assertions.assertTrue(registry.getDefinition("data_v").getColumns().contains("V_THD")); + Assertions.assertEquals(4, registry.getDefinition("data_v").getPhaseCodes().size()); + Assertions.assertTrue(registry.getDefinition("data_v").getPhaseCodes().contains("T")); + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTaskStatusHolderTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTaskStatusHolderTest.java new file mode 100644 index 0000000..bd9ddc6 --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTaskStatusHolderTest.java @@ -0,0 +1,33 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.Arrays; + +/** + * 补数任务状态持有器测试。 + */ +class AddDataTaskStatusHolderTest { + + private final AddDataTaskStatusHolder holder = new AddDataTaskStatusHolder(new AddDataTimeSlotCalculator()); + + @Test + void shouldReturnHourlyTimeResultsWhenCreateTask() { + AddDataTaskCommand command = new AddDataTaskCommand( + Arrays.asList("1"), + LocalDateTime.of(2026, 4, 28, 10, 7, 0), + LocalDateTime.of(2026, 4, 28, 13, 0, 0), + 5); + + AddDataTaskStatusVO status = holder.createWaitingTask(command); + + Assertions.assertEquals(Arrays.asList( + "2026-04-28 11:00:00", + "2026-04-28 12:00:00", + "2026-04-28 13:00:00"), status.getHourlyTimeResults()); + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTemplateRegistryTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTemplateRegistryTest.java new file mode 100644 index 0000000..e97d80c --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTemplateRegistryTest.java @@ -0,0 +1,33 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTemplateVO; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * 模板注册测试。 + */ +class AddDataTemplateRegistryTest { + + @Test + void shouldOnlyExposeABCTPhaseCodes() { + AddDataTemplateRegistry registry = new AddDataTemplateRegistry(); + + List templates = registry.getTemplates(); + Set allowedPhaseCodes = new HashSet(); + allowedPhaseCodes.add("A"); + allowedPhaseCodes.add("B"); + allowedPhaseCodes.add("C"); + allowedPhaseCodes.add("T"); + + for (AddDataTemplateVO template : templates) { + for (String phaseCode : template.getPhaseCodes()) { + Assertions.assertTrue(allowedPhaseCodes.contains(phaseCode)); + } + } + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTimeSlotCalculatorTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTimeSlotCalculatorTest.java new file mode 100644 index 0000000..e234e4d --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataTimeSlotCalculatorTest.java @@ -0,0 +1,61 @@ +package com.njcn.gather.tool.adddata.component; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * 时间槽计算测试。 + */ +class AddDataTimeSlotCalculatorTest { + + private final AddDataTimeSlotCalculator calculator = new AddDataTimeSlotCalculator(); + + @Test + void shouldAlignToNextNaturalSlot() { + LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 12); + LocalDateTime end = LocalDateTime.of(2026, 4, 28, 10, 22, 0); + + List slots = calculator.buildTimeSlots(start, end, 5); + + Assertions.assertEquals(3, slots.size()); + Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 10, 0), slots.get(0)); + Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 20, 0), slots.get(2)); + } + + @Test + void shouldReturnEmptyWhenRangeDoesNotContainAnySlot() { + LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 0); + LocalDateTime end = LocalDateTime.of(2026, 4, 28, 10, 9, 59); + + List slots = calculator.buildTimeSlots(start, end, 10); + + Assertions.assertTrue(slots.isEmpty()); + } + + @Test + void shouldBuildHourlySlotsFromNextNaturalHour() { + LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 7, 0); + LocalDateTime end = LocalDateTime.of(2026, 4, 28, 13, 0, 0); + + List slots = calculator.buildHourlyTimeSlots(start, end); + + Assertions.assertEquals(3, slots.size()); + Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 11, 0, 0), slots.get(0)); + Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 13, 0, 0), slots.get(2)); + } + + @Test + void shouldIncludeStartWhenAlreadyAtNaturalHour() { + LocalDateTime start = LocalDateTime.of(2026, 4, 28, 10, 0, 0); + LocalDateTime end = LocalDateTime.of(2026, 4, 28, 12, 30, 0); + + List slots = calculator.buildHourlyTimeSlots(start, end); + + Assertions.assertEquals(3, slots.size()); + Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 10, 0, 0), slots.get(0)); + Assertions.assertEquals(LocalDateTime.of(2026, 4, 28, 12, 0, 0), slots.get(2)); + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataValueGeneratorTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataValueGeneratorTest.java new file mode 100644 index 0000000..98fd99e --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataValueGeneratorTest.java @@ -0,0 +1,32 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * 数值生成器测试。 + */ +class AddDataValueGeneratorTest { + + @Test + void shouldGenerateStableDataVRowWithExpectedColumnCount() throws Exception { + AddDataTableRegistry registry = new AddDataTableRegistry(); + registry.afterPropertiesSet(); + AddDataTableDefinition definition = registry.getDefinition("data_v"); + AddDataValueGenerator generator = new AddDataValueGenerator(); + + List row = generator.generateRow(definition, "f04a9d62e3d24e6580e4f32b40967505", LocalDateTime.of(2026, 4, 28, 10, 10, 0), "A"); + + Assertions.assertEquals(definition.getColumns().size(), row.size()); + Assertions.assertEquals("A", row.get(definition.getColumns().indexOf("PHASIC_TYPE"))); + Double rms = (Double) row.get(definition.getColumns().indexOf("RMS")); + Double rmsMax = (Double) row.get(definition.getColumns().indexOf("RMS_MAX")); + Double rmsMin = (Double) row.get(definition.getColumns().indexOf("RMS_MIN")); + Assertions.assertTrue(rmsMax >= rms); + Assertions.assertTrue(rmsMin <= rms); + } +} diff --git a/tools/pom.xml b/tools/pom.xml index 4d7439d..0ac4fcc 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -21,6 +21,7 @@ activate-tool mms-mapping wave-tool + add-data