diff --git a/entrance/src/main/resources/application.yml b/entrance/src/main/resources/application.yml index 130193a..bc11383 100644 --- a/entrance/src/main/resources/application.yml +++ b/entrance/src/main/resources/application.yml @@ -50,7 +50,7 @@ webSocket: steady: influxdb: - url: http://192.168.1.68:18086 + url: http://127.0.0.1:18086 database: pqsbase username: admin password: 123456 diff --git a/tools/add-data/README.md b/tools/add-data/README.md index 4402171..b61411b 100644 --- a/tools/add-data/README.md +++ b/tools/add-data/README.md @@ -2,7 +2,7 @@ ## 模块定位 -`add-data` 当前提供电能质量 13 张表批量补数能力,支持补数规模预估、后台异步执行、任务状态查询和前端模板规则查询。 +`add-data` 当前提供电能质量 13 张表批量补数能力,支持补数规模预估、后台异步执行、任务状态查询、前端模板规则查询,并提供独立的 InfluxDB 写入入口。 ## 当前范围 @@ -33,13 +33,23 @@ add-data/ ## 基础骨架说明 - `controller/AddDataTaskController` - - 提供预估、创建任务、查询任务状态三个接口 + - 提供 MySQL 预估、创建任务、查询任务状态三个接口 +- `controller/AddDataInfluxTaskController` + - 提供 InfluxDB 创建任务、查询任务状态两个接口 +- `controller/AddDataStorageTypeController` + - 提供当前支持入库类型查询接口,返回 `MYSQL`、`INFLUXDB` - `controller/AddDataTemplateController` - 提供前端参数模板规则查询接口 - `component/AddDataTaskExecutor` - - 负责后台异步补数任务执行 + - 负责 MySQL 后台异步补数任务执行 +- `component/AddDataInfluxTaskExecutor` + - 负责 InfluxDB 后台异步补数任务执行 - `component/AddDataBatchWriter` - 负责 `INSERT IGNORE` 批量写入与失败降级 +- `component/AddDataInfluxWriter` + - 负责将生成数据转换为 InfluxDB line protocol 并写入 `/write` +- `component/AddDataInfluxFieldMapper` + - 负责把 add-data 表字段映射为 InfluxDB measurement、tag 和 field - `component/AddDataValueGenerator` - 负责按同源规则生成 13 张表数据 - `component/AddDataTableRegistry` @@ -51,4 +61,17 @@ add-data/ 当前实现按 `A/B/C/T` 四类数据类型生成和预估补数。 +InfluxDB 写入复用 steady 的 InfluxDB 配置源,保持两个模块使用同一个库: + +```yaml +steady: + influxdb: + url: http://127.0.0.1:18086 + database: pqsbase + username: admin + password: ${STEADY_INFLUXDB_PASSWORD:} + connect-timeout-ms: 5000 + read-timeout-ms: 30000 +``` + 后续如果补齐逐表真实相别映射、任务持久化或更细粒度模板规则,应优先沿现有职责边界扩展,不回退为单一大类承载全部逻辑。 diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxFieldMapper.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxFieldMapper.java new file mode 100644 index 0000000..0445616 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxFieldMapper.java @@ -0,0 +1,190 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataInfluxFieldGroup; +import org.springframework.stereotype.Component; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; + +/** + * add-data 表字段到 InfluxDB 字段的映射器。 + */ +@Component +public class AddDataInfluxFieldMapper { + + /** 无 value_type tag 的 measurement。 */ + private static final Set NO_VALUE_TYPE_MEASUREMENTS = new LinkedHashSet( + Arrays.asList("data_flicker", "data_fluc", "data_plt")); + + /** 统计类型顺序。 */ + private static final List VALUE_TYPES = Arrays.asList("AVG", "MAX", "MIN", "CP95"); + + /** 派生字段后缀映射。 */ + private static final Map STAT_SUFFIX_MAP = buildStatSuffixMap(); + + /** + * 按 InfluxDB value_type 分组映射字段。 + * + * @param measurement measurement 名称 + * @param columns 表字段 + * @param row 行数据 + * @return 字段分组 + */ + public List mapFieldGroups(String measurement, List columns, List row) { + if (!hasValueTypeTag(measurement)) { + return mapWithoutValueType(measurement, columns, row); + } + Map> groupedFields = new LinkedHashMap>(); + for (String valueType : VALUE_TYPES) { + groupedFields.put(valueType, new LinkedHashMap()); + } + for (int i = 0; i < columns.size(); i++) { + String column = columns.get(i); + if (isInfrastructureColumn(column)) { + continue; + } + StatColumn statColumn = resolveStatColumn(column); + Object value = row.get(i); + if (value == null) { + continue; + } + groupedFields.get(statColumn.valueType).put(mapFieldName(measurement, statColumn.baseColumn), value); + } + List result = new ArrayList(); + for (String valueType : VALUE_TYPES) { + Map fields = groupedFields.get(valueType); + if (!fields.isEmpty()) { + result.add(new AddDataInfluxFieldGroup(valueType, fields)); + } + } + return result; + } + + /** + * 从行数据中解析 TIMEID。 + * + * @param columns 表字段 + * @param row 行数据 + * @return 时间 + */ + public LocalDateTime resolveTimeId(List columns, List row) { + Object value = getRequiredValue(columns, row, "TIMEID"); + if (value instanceof Timestamp) { + return ((Timestamp) value).toLocalDateTime(); + } + if (value instanceof LocalDateTime) { + return (LocalDateTime) value; + } + throw new IllegalArgumentException("TIMEID 类型不支持:" + value.getClass().getName()); + } + + /** + * 从行数据中解析字符串字段。 + * + * @param columns 表字段 + * @param row 行数据 + * @param columnName 字段名 + * @return 字符串值 + */ + public String resolveString(List columns, List row, String columnName) { + Object value = getRequiredValue(columns, row, columnName); + return String.valueOf(value); + } + + /** + * 判断 measurement 是否带 value_type tag。 + * + * @param measurement measurement 名称 + * @return true 表示需要写 value_type + */ + public boolean hasValueTypeTag(String measurement) { + return !NO_VALUE_TYPE_MEASUREMENTS.contains(measurement); + } + + private List mapWithoutValueType(String measurement, List columns, List row) { + Map fields = new LinkedHashMap(); + for (int i = 0; i < columns.size(); i++) { + String column = columns.get(i); + if (isInfrastructureColumn(column) || isDerivedColumn(column)) { + continue; + } + Object value = row.get(i); + if (value != null) { + fields.put(mapFieldName(measurement, column), value); + } + } + List result = new ArrayList(); + if (!fields.isEmpty()) { + result.add(new AddDataInfluxFieldGroup(null, fields)); + } + return result; + } + + private Object getRequiredValue(List columns, List row, String columnName) { + int index = columns.indexOf(columnName); + if (index < 0 || index >= row.size() || row.get(index) == null) { + throw new IllegalArgumentException("缺少 InfluxDB 写入字段:" + columnName); + } + return row.get(index); + } + + private StatColumn resolveStatColumn(String column) { + for (Map.Entry entry : STAT_SUFFIX_MAP.entrySet()) { + if (column.endsWith(entry.getKey())) { + return new StatColumn(column.substring(0, column.length() - entry.getKey().length()), entry.getValue()); + } + } + return new StatColumn(column, "AVG"); + } + + private boolean isInfrastructureColumn(String column) { + return "TIMEID".equals(column) || "LINEID".equals(column) || "PHASIC_TYPE".equals(column) || "QUALITYFLAG".equals(column); + } + + private boolean isDerivedColumn(String column) { + for (String suffix : STAT_SUFFIX_MAP.keySet()) { + if (column.endsWith(suffix)) { + return true; + } + } + return false; + } + + private String mapFieldName(String measurement, String column) { + if ("data_v".equals(measurement) && "RMSAB".equals(column)) { + return "rms_lvr"; + } + return column.toLowerCase(Locale.ENGLISH); + } + + private static Map buildStatSuffixMap() { + Map result = new LinkedHashMap(); + result.put("_MAX", "MAX"); + result.put("_MIN", "MIN"); + result.put("_CP95", "CP95"); + return result; + } + + /** + * 字段对应的统计类型。 + */ + private static final class StatColumn { + /** 基础字段名。 */ + private final String baseColumn; + /** 统计类型。 */ + private final String valueType; + + private StatColumn(String baseColumn, String valueType) { + this.baseColumn = baseColumn; + this.valueType = valueType; + } + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxTaskExecutor.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxTaskExecutor.java new file mode 100644 index 0000000..f17b61c --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxTaskExecutor.java @@ -0,0 +1,233 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; + +/** + * InfluxDB 补数异步任务执行器。 + */ +@Slf4j +@Component +public class AddDataInfluxTaskExecutor { + + /** 多个时间点组成一个处理窗口,兼顾批量效率和内存占用。 */ + private static final int TIME_WINDOW_SIZE = 30; + + /** 任务线程池。 */ + private final ExecutorService addDataTaskExecutorService; + + /** 表定义注册器。 */ + private final AddDataTableRegistry addDataTableRegistry; + + /** 时间槽计算器。 */ + private final AddDataTimeSlotCalculator addDataTimeSlotCalculator; + + /** 数据生成器。 */ + private final AddDataValueGenerator addDataValueGenerator; + + /** InfluxDB 写入器。 */ + private final AddDataInfluxWriter addDataInfluxWriter; + + /** 状态持有器。 */ + private final AddDataTaskStatusHolder addDataTaskStatusHolder; + + public AddDataInfluxTaskExecutor(@Qualifier("addDataTaskExecutorService") ExecutorService addDataTaskExecutorService, + AddDataTableRegistry addDataTableRegistry, + AddDataTimeSlotCalculator addDataTimeSlotCalculator, + AddDataValueGenerator addDataValueGenerator, + AddDataInfluxWriter addDataInfluxWriter, + AddDataTaskStatusHolder addDataTaskStatusHolder) { + this.addDataTaskExecutorService = addDataTaskExecutorService; + this.addDataTableRegistry = addDataTableRegistry; + this.addDataTimeSlotCalculator = addDataTimeSlotCalculator; + this.addDataValueGenerator = addDataValueGenerator; + this.addDataInfluxWriter = addDataInfluxWriter; + this.addDataTaskStatusHolder = addDataTaskStatusHolder; + } + + /** + * 提交后台任务。 + * + * @param taskId 任务编号 + * @param command 任务命令 + */ + public void submit(String taskId, AddDataTaskCommand command) { + addDataTaskExecutorService.submit(() -> execute(taskId, command)); + } + + private void execute(String taskId, AddDataTaskCommand command) { + try { + addDataTaskStatusHolder.markRunning(taskId); + Map> timeSlotsByTable = buildTimeSlotsByTable(command); + Map> timeSlotLookupByTable = buildTimeSlotLookupByTable(timeSlotsByTable); + List mergedTimeSlots = mergeTimeSlots(timeSlotsByTable); + Map batchNoMap = new HashMap(); + Map>> pendingRowsByTable = buildPendingRowsByTable(); + for (int startIndex = 0; startIndex < mergedTimeSlots.size(); startIndex += TIME_WINDOW_SIZE) { + int endIndex = Math.min(startIndex + TIME_WINDOW_SIZE, mergedTimeSlots.size()); + List timeWindow = mergedTimeSlots.subList(startIndex, endIndex); + GeneratedBatchData generatedBatchData = generateBatchData(command, timeWindow, timeSlotLookupByTable); + if (!writeBatchData(taskId, generatedBatchData, pendingRowsByTable, batchNoMap)) { + return; + } + } + if (!flushRemainingBatchData(taskId, pendingRowsByTable, batchNoMap)) { + return; + } + addDataTaskStatusHolder.markSuccess(taskId); + } catch (Exception ex) { + log.error("执行 InfluxDB 补数任务失败,taskId={}", taskId, ex); + addDataTaskStatusHolder.markFailed(taskId, ex.getMessage()); + } + } + + private Map> buildTimeSlotsByTable(AddDataTaskCommand command) { + Map> result = new LinkedHashMap>(); + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + int intervalMinutes = definition.resolveIntervalMinutes(command.getIntervalMinutes()); + List timeSlots = addDataTimeSlotCalculator.buildTimeSlots( + command.getStartTime(), command.getEndTime(), intervalMinutes); + result.put(definition.getTableName(), timeSlots); + } + return result; + } + + private Map> buildTimeSlotLookupByTable(Map> timeSlotsByTable) { + Map> result = new LinkedHashMap>(); + for (Map.Entry> entry : timeSlotsByTable.entrySet()) { + result.put(entry.getKey(), new HashSet(entry.getValue())); + } + return result; + } + + private Map>> buildPendingRowsByTable() { + Map>> result = new LinkedHashMap>>(); + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + result.put(definition.getTableName(), new ArrayList>()); + } + return result; + } + + private List mergeTimeSlots(Map> timeSlotsByTable) { + TreeSet merged = new TreeSet(); + for (List timeSlots : timeSlotsByTable.values()) { + merged.addAll(timeSlots); + } + return new ArrayList(merged); + } + + private GeneratedBatchData generateBatchData(AddDataTaskCommand command, List timeWindow, + Map> timeSlotLookupByTable) { + Map>> rowsByTable = new LinkedHashMap>>(); + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + rowsByTable.put(definition.getTableName(), new ArrayList>()); + } + for (LocalDateTime timeSlot : timeWindow) { + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + Set tableTimeSlots = timeSlotLookupByTable.get(definition.getTableName()); + if (!tableTimeSlots.contains(timeSlot)) { + continue; + } + List> rows = rowsByTable.get(definition.getTableName()); + for (String lineId : command.getLineIds()) { + for (String phaseCode : definition.getPhaseCodes()) { + rows.add(addDataValueGenerator.generateRow(definition, lineId, timeSlot, phaseCode)); + } + } + } + } + return new GeneratedBatchData(rowsByTable); + } + + private boolean writeBatchData(String taskId, GeneratedBatchData generatedBatchData, + Map>> pendingRowsByTable, Map batchNoMap) { + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + List> rows = generatedBatchData.getRows(definition.getTableName()); + if (rows.isEmpty()) { + continue; + } + List> pendingRows = pendingRowsByTable.get(definition.getTableName()); + pendingRows.addAll(rows); + int batchSize = definition.getBatchSize(); + while (pendingRows.size() >= batchSize) { + List> batchRows = new ArrayList>(pendingRows.subList(0, batchSize)); + int batchNo = nextBatchNo(batchNoMap, definition.getTableName()); + if (!flushBatch(taskId, definition, batchRows, batchNo)) { + return false; + } + pendingRows.subList(0, batchSize).clear(); + } + } + return true; + } + + private boolean flushRemainingBatchData(String taskId, Map>> pendingRowsByTable, + Map batchNoMap) { + for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) { + List> pendingRows = pendingRowsByTable.get(definition.getTableName()); + if (pendingRows == null || pendingRows.isEmpty()) { + continue; + } + int batchNo = nextBatchNo(batchNoMap, definition.getTableName()); + List> batchRows = new ArrayList>(pendingRows); + if (!flushBatch(taskId, definition, batchRows, batchNo)) { + return false; + } + pendingRows.clear(); + } + return true; + } + + private int nextBatchNo(Map batchNoMap, String tableName) { + Integer currentBatchNo = batchNoMap.get(tableName); + int nextBatchNo = currentBatchNo == null ? 1 : currentBatchNo + 1; + batchNoMap.put(tableName, nextBatchNo); + return nextBatchNo; + } + + private boolean flushBatch(String taskId, AddDataTableDefinition definition, List> batchRows, int batchNo) { + addDataTaskStatusHolder.updateCurrentBatch(taskId, definition.getTableName(), batchNo, batchRows.size()); + AddDataBatchWriteResult writeResult = addDataInfluxWriter.writeBatch(definition, batchRows); + addDataTaskStatusHolder.addProgress(taskId, writeResult.getInsertedCount(), writeResult.getSkippedCount(), writeResult.getFailedCount()); + if (writeResult.hasFailure()) { + addDataTaskStatusHolder.markFailed(taskId, + "InfluxDB 表 " + definition.getTableName() + " 第 " + batchNo + " 批执行失败:" + writeResult.getFirstFailureMessage()); + return false; + } + return true; + } + + /** + * 当前窗口生成结果。 + */ + private static final class GeneratedBatchData { + + /** 按表名分组的待写入行数据。 */ + private final Map>> rowsByTable; + + private GeneratedBatchData(Map>> rowsByTable) { + this.rowsByTable = rowsByTable; + } + + private List> getRows(String tableName) { + List> rows = rowsByTable.get(tableName); + return rows == null ? Collections.emptyList() : rows; + } + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxWriter.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxWriter.java new file mode 100644 index 0000000..b7d473a --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataInfluxWriter.java @@ -0,0 +1,213 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.config.AddDataInfluxDbProperties; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataInfluxFieldGroup; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URLEncoder; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * add-data InfluxDB 批量写入组件。 + */ +@Component +@RequiredArgsConstructor +public class AddDataInfluxWriter { + + /** InfluxDB 配置。 */ + private final AddDataInfluxDbProperties properties; + + /** 字段映射器。 */ + private final AddDataInfluxFieldMapper fieldMapper; + + /** + * 写入一个批次。 + * + * @param definition 表定义 + * @param rows 行数据 + * @return 写入结果 + */ + public AddDataBatchWriteResult writeBatch(AddDataTableDefinition definition, List> rows) { + if (rows == null || rows.isEmpty()) { + return new AddDataBatchWriteResult(0L, 0L, 0L, null); + } + List lines = new ArrayList(); + try { + validateConfig(); + for (List row : rows) { + lines.addAll(buildRowLineProtocols(definition, row)); + } + if (lines.isEmpty()) { + return new AddDataBatchWriteResult(0L, 0L, 0L, null); + } + executeWrite(String.join("\n", lines)); + return new AddDataBatchWriteResult(lines.size(), 0L, 0L, null); + } catch (RuntimeException ex) { + long failedCount = lines.isEmpty() ? rows.size() : lines.size(); + return new AddDataBatchWriteResult(0L, 0L, failedCount, ex.getMessage()); + } + } + + /** + * 构建单行 MySQL 结构对应的 InfluxDB line protocol。 + * + * @param definition 表定义 + * @param row 行数据 + * @return line protocol 列表 + */ + private List buildRowLineProtocols(AddDataTableDefinition definition, List row) { + LocalDateTime timeId = fieldMapper.resolveTimeId(definition.getColumns(), row); + String lineId = fieldMapper.resolveString(definition.getColumns(), row, "LINEID"); + String phasicType = fieldMapper.resolveString(definition.getColumns(), row, "PHASIC_TYPE"); + String qualityFlag = fieldMapper.resolveString(definition.getColumns(), row, "QUALITYFLAG"); + List groups = fieldMapper.mapFieldGroups(definition.getTableName(), definition.getColumns(), row); + List result = new ArrayList(); + for (AddDataInfluxFieldGroup group : groups) { + Map tags = new LinkedHashMap(); + tags.put("line_id", lineId); + tags.put("phasic_type", phasicType); + tags.put("quality_flag", qualityFlag); + if (group.getValueType() != null) { + tags.put("value_type", group.getValueType()); + } + result.add(buildLineProtocol(definition.getTableName(), tags, group.getFields(), timeId)); + } + return result; + } + + /** + * 构建 InfluxDB line protocol。 + * + * @param measurement measurement 名称 + * @param tags tag 集合 + * @param fields field 集合 + * @param timeId 时间 + * @return line protocol + */ + String buildLineProtocol(String measurement, Map tags, Map fields, LocalDateTime timeId) { + StringBuilder line = new StringBuilder(escapeMeasurement(measurement)); + for (Map.Entry tag : tags.entrySet()) { + line.append(",").append(escapeKey(tag.getKey())).append("=").append(escapeKey(tag.getValue())); + } + line.append(" "); + boolean first = true; + for (Map.Entry field : fields.entrySet()) { + if (!first) { + line.append(","); + } + line.append(escapeKey(field.getKey())).append("=").append(formatFieldValue(field.getValue())); + first = false; + } + line.append(" ").append(toEpochNanos(timeId)); + return line.toString(); + } + + private void executeWrite(String body) { + HttpURLConnection connection = null; + try { + URL url = new URL(buildWriteUrl()); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setConnectTimeout(properties.getConnectTimeoutMs()); + connection.setReadTimeout(properties.getReadTimeoutMs()); + connection.setDoOutput(true); + byte[] payload = body.getBytes(StandardCharsets.UTF_8); + connection.setFixedLengthStreamingMode(payload.length); + try (OutputStream outputStream = connection.getOutputStream()) { + outputStream.write(payload); + } + int status = connection.getResponseCode(); + if (status < 200 || status >= 300) { + String errorBody = readBody(connection.getErrorStream()); + throw new IllegalStateException("InfluxDB 写入失败:" + errorBody); + } + } catch (IOException ex) { + throw new IllegalStateException("InfluxDB 写入异常:" + ex.getMessage(), ex); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + private String buildWriteUrl() throws IOException { + StringBuilder url = new StringBuilder(trimRightSlash(properties.getUrl())).append("/write?"); + url.append("db=").append(encode(properties.getDatabase())); + if (properties.getUsername() != null && !properties.getUsername().trim().isEmpty()) { + url.append("&u=").append(encode(properties.getUsername().trim())); + } + if (properties.getPassword() != null && !properties.getPassword().trim().isEmpty()) { + url.append("&p=").append(encode(properties.getPassword())); + } + return url.toString(); + } + + private void validateConfig() { + if (properties.getUrl() == null || properties.getUrl().trim().isEmpty()) { + throw new IllegalStateException("add-data InfluxDB 地址未配置"); + } + if (properties.getDatabase() == null || properties.getDatabase().trim().isEmpty()) { + throw new IllegalStateException("add-data InfluxDB database 未配置"); + } + } + + private String readBody(InputStream stream) throws IOException { + if (stream == null) { + return ""; + } + BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); + StringBuilder body = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + body.append(line); + } + return body.toString(); + } + + private String trimRightSlash(String value) { + String text = value.trim(); + while (text.endsWith("/")) { + text = text.substring(0, text.length() - 1); + } + return text; + } + + private String encode(String value) throws IOException { + return URLEncoder.encode(value, StandardCharsets.UTF_8.name()); + } + + private String escapeMeasurement(String value) { + return value.replace(",", "\\,").replace(" ", "\\ "); + } + + private String escapeKey(String value) { + return value.replace(",", "\\,").replace(" ", "\\ ").replace("=", "\\="); + } + + private String formatFieldValue(Object value) { + if (value instanceof Number) { + return String.valueOf(value); + } + return "\"" + String.valueOf(value).replace("\\", "\\\\").replace("\"", "\\\"") + "\""; + } + + private long toEpochNanos(LocalDateTime timeId) { + return timeId.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000000L; + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataValueGenerator.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataValueGenerator.java index 415e20d..e51324d 100644 --- a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataValueGenerator.java +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/component/AddDataValueGenerator.java @@ -60,7 +60,7 @@ public class AddDataValueGenerator { continue; } if ("QUALITYFLAG".equals(column)) { - row.add(1); + row.add(0); continue; } row.add(resolveColumnValue(definition.getTableName(), column, baseValues, state)); diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/config/AddDataInfluxDbProperties.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/config/AddDataInfluxDbProperties.java new file mode 100644 index 0000000..d449eb2 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/config/AddDataInfluxDbProperties.java @@ -0,0 +1,32 @@ +package com.njcn.gather.tool.adddata.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * add-data InfluxDB 写入配置,复用 steady InfluxDB 配置源。 + */ +@Data +@Component +@ConfigurationProperties(prefix = "steady.influxdb") +public class AddDataInfluxDbProperties { + + /** InfluxDB 访问地址,例如 http://127.0.0.1:18086。 */ + private String url; + + /** InfluxDB database。 */ + private String database; + + /** 用户名。 */ + private String username; + + /** 密码。 */ + private String password; + + /** 连接超时时间。 */ + private Integer connectTimeoutMs = 5000; + + /** 读取超时时间。 */ + private Integer readTimeoutMs = 30000; +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataInfluxTaskController.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataInfluxTaskController.java new file mode 100644 index 0000000..c50b3a3 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataInfluxTaskController.java @@ -0,0 +1,72 @@ +package com.njcn.gather.tool.adddata.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.LogUtil; +import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; +import com.njcn.gather.tool.adddata.service.AddDataInfluxTaskService; +import com.njcn.web.controller.BaseController; +import com.njcn.web.utils.HttpResultUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * InfluxDB 数据补录任务接口。 + */ +@Validated +@Slf4j +@Api(tags = "InfluxDB 数据补录任务") +@RestController +@RequestMapping("/addData/influx/task") +@RequiredArgsConstructor +public class AddDataInfluxTaskController extends BaseController { + + /** InfluxDB 数据补录任务服务。 */ + private final AddDataInfluxTaskService addDataInfluxTaskService; + + /** + * 创建 InfluxDB 后台补数任务。 + * + * @param param 补数参数 + * @return 任务编号 + */ + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("创建 InfluxDB 电能质量批量补数任务") + @PostMapping("/create") + public HttpResult create(@RequestBody @Validated AddDataTaskRequestParam param) { + String methodDescribe = getMethodDescribe("create"); + LogUtil.njcnDebug(log, "{},开始创建 InfluxDB 补数任务,lineCount={}, intervalMinutes={}", + methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes()); + AddDataTaskCreateVO result = addDataInfluxTaskService.create(param); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); + } + + /** + * 查询 InfluxDB 补数任务状态。 + * + * @param taskId 任务编号 + * @return 当前任务状态 + */ + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("查询 InfluxDB 电能质量批量补数任务状态") + @GetMapping("/status/{taskId}") + public HttpResult status(@PathVariable("taskId") String taskId) { + String methodDescribe = getMethodDescribe("status"); + LogUtil.njcnDebug(log, "{},开始查询 InfluxDB 补数任务状态,taskId={}", methodDescribe, taskId); + AddDataTaskStatusVO result = addDataInfluxTaskService.getStatus(taskId); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataStorageTypeController.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataStorageTypeController.java new file mode 100644 index 0000000..a8303bb --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/controller/AddDataStorageTypeController.java @@ -0,0 +1,47 @@ +package com.njcn.gather.tool.adddata.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.gather.tool.adddata.pojo.enums.AddDataStorageTypeEnum; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataStorageTypeVO; +import com.njcn.web.controller.BaseController; +import com.njcn.web.utils.HttpResultUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; + +/** + * add-data 入库类型接口。 + */ +@Api(tags = "数据补录入库类型") +@RestController +@RequestMapping("/addData/storage-type") +public class AddDataStorageTypeController extends BaseController { + + /** + * 查询当前支持的入库类型。 + * + * @return 入库类型列表 + */ + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @ApiOperation("查询数据补录入库类型") + @GetMapping("/list") + public HttpResult> list() { + String methodDescribe = getMethodDescribe("list"); + List result = new ArrayList(); + for (AddDataStorageTypeEnum storageType : AddDataStorageTypeEnum.values()) { + AddDataStorageTypeVO vo = new AddDataStorageTypeVO(); + vo.setCode(storageType.getCode()); + vo.setName(storageType.getName()); + result.add(vo); + } + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataInfluxFieldGroup.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataInfluxFieldGroup.java new file mode 100644 index 0000000..fb76e35 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/bo/AddDataInfluxFieldGroup.java @@ -0,0 +1,25 @@ +package com.njcn.gather.tool.adddata.pojo.bo; + +import lombok.Getter; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * InfluxDB 单个 value_type 分组下的字段集合。 + */ +@Getter +public class AddDataInfluxFieldGroup { + + /** 统计类型,闪变类 measurement 不使用该 tag。 */ + private final String valueType; + + /** InfluxDB field 与字段值。 */ + private final Map fields; + + public AddDataInfluxFieldGroup(String valueType, Map fields) { + this.valueType = valueType; + this.fields = Collections.unmodifiableMap(new LinkedHashMap(fields)); + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/enums/AddDataStorageTypeEnum.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/enums/AddDataStorageTypeEnum.java new file mode 100644 index 0000000..20b84ff --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/enums/AddDataStorageTypeEnum.java @@ -0,0 +1,27 @@ +package com.njcn.gather.tool.adddata.pojo.enums; + +import lombok.Getter; + +/** + * add-data 支持的入库类型。 + */ +@Getter +public enum AddDataStorageTypeEnum { + + /** MySQL 入库。 */ + MYSQL("MYSQL", "MySQL"), + + /** InfluxDB 入库。 */ + INFLUXDB("INFLUXDB", "InfluxDB"); + + /** 类型编码。 */ + private final String code; + + /** 展示名称。 */ + private final String name; + + AddDataStorageTypeEnum(String code, String name) { + this.code = code; + this.name = name; + } +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataStorageTypeVO.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataStorageTypeVO.java new file mode 100644 index 0000000..53cc758 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/pojo/vo/AddDataStorageTypeVO.java @@ -0,0 +1,19 @@ +package com.njcn.gather.tool.adddata.pojo.vo; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * add-data 入库类型。 + */ +@Data +@ApiModel("add-data 入库类型") +public class AddDataStorageTypeVO { + + @ApiModelProperty("类型编码") + private String code; + + @ApiModelProperty("展示名称") + private String name; +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataInfluxTaskService.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataInfluxTaskService.java new file mode 100644 index 0000000..253b3b1 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/AddDataInfluxTaskService.java @@ -0,0 +1,27 @@ +package com.njcn.gather.tool.adddata.service; + +import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; + +/** + * InfluxDB 数据补录任务服务。 + */ +public interface AddDataInfluxTaskService { + + /** + * 创建 InfluxDB 后台补数任务。 + * + * @param param 补数参数 + * @return 任务编号 + */ + AddDataTaskCreateVO create(AddDataTaskRequestParam param); + + /** + * 查询任务状态。 + * + * @param taskId 任务编号 + * @return 当前任务状态 + */ + AddDataTaskStatusVO getStatus(String taskId); +} diff --git a/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataInfluxTaskServiceImpl.java b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataInfluxTaskServiceImpl.java new file mode 100644 index 0000000..8560168 --- /dev/null +++ b/tools/add-data/src/main/java/com/njcn/gather/tool/adddata/service/impl/AddDataInfluxTaskServiceImpl.java @@ -0,0 +1,93 @@ +package com.njcn.gather.tool.adddata.service.impl; + +import com.njcn.gather.tool.adddata.component.AddDataInfluxTaskExecutor; +import com.njcn.gather.tool.adddata.component.AddDataTaskStatusHolder; +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand; +import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO; +import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO; +import com.njcn.gather.tool.adddata.service.AddDataInfluxTaskService; +import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * InfluxDB 数据补录任务服务实现。 + */ +@Service +@RequiredArgsConstructor +public class AddDataInfluxTaskServiceImpl implements AddDataInfluxTaskService { + + /** 支持的用户步长。 */ + private static final Set SUPPORTED_INTERVALS = new LinkedHashSet(Arrays.asList(1, 3, 5, 10)); + + /** 任务状态持有器。 */ + private final AddDataTaskStatusHolder addDataTaskStatusHolder; + + /** InfluxDB 后台执行器。 */ + private final AddDataInfluxTaskExecutor addDataInfluxTaskExecutor; + + @Override + public AddDataTaskCreateVO create(AddDataTaskRequestParam param) { + AddDataTaskCommand command = buildCommand(param); + AddDataTaskStatusVO snapshot = addDataTaskStatusHolder.createWaitingTask(command); + addDataInfluxTaskExecutor.submit(snapshot.getTaskId(), command); + AddDataTaskCreateVO result = new AddDataTaskCreateVO(); + result.setTaskId(snapshot.getTaskId()); + result.setStatus(snapshot.getStatus()); + return result; + } + + @Override + public AddDataTaskStatusVO getStatus(String taskId) { + if (taskId == null || taskId.trim().isEmpty()) { + throw new IllegalArgumentException("任务编号不能为空"); + } + return addDataTaskStatusHolder.getStatus(taskId.trim()); + } + + private AddDataTaskCommand buildCommand(AddDataTaskRequestParam param) { + if (param == null) { + throw new IllegalArgumentException("补数参数不能为空"); + } + Integer intervalMinutes = param.getIntervalMinutes(); + if (intervalMinutes == null || !SUPPORTED_INTERVALS.contains(intervalMinutes)) { + throw new IllegalArgumentException("时间步长仅支持 1、3、5、10 分钟"); + } + List lineIds = normalizeLineIds(param.getLineIds()); + LocalDateTime startTime = AddDataDateTimeUtil.parse(param.getStartTime()); + LocalDateTime endTime = AddDataDateTimeUtil.parse(param.getEndTime()); + if (startTime.isAfter(endTime)) { + throw new IllegalArgumentException("开始时间不能大于结束时间"); + } + return new AddDataTaskCommand(lineIds, startTime, endTime, intervalMinutes); + } + + private List normalizeLineIds(List lineIds) { + if (lineIds == null || lineIds.isEmpty()) { + throw new IllegalArgumentException("监测点 ID 列表不能为空"); + } + LinkedHashSet normalized = new LinkedHashSet(); + for (String lineId : lineIds) { + if (lineId == null) { + throw new IllegalArgumentException("监测点 ID 不能为空"); + } + String normalizedLineId = lineId.trim(); + if (normalizedLineId.isEmpty()) { + throw new IllegalArgumentException("监测点 ID 不能为空"); + } + if (normalizedLineId.length() > 32) { + throw new IllegalArgumentException("监测点 ID 长度不能超过 32 位"); + } + normalized.add(normalizedLineId); + } + return new ArrayList(normalized); + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataInfluxFieldMapperTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataInfluxFieldMapperTest.java new file mode 100644 index 0000000..e268d1f --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataInfluxFieldMapperTest.java @@ -0,0 +1,60 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataInfluxFieldGroup; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +/** + * add-data InfluxDB 字段映射契约测试。 + */ +class AddDataInfluxFieldMapperTest { + + @Test + void shouldSplitStatsIntoValueTypeGroups() { + AddDataInfluxFieldMapper mapper = new AddDataInfluxFieldMapper(); + List columns = Arrays.asList("TIMEID", "LINEID", "PHASIC_TYPE", "QUALITYFLAG", + "RMS", "RMS_MAX", "RMS_MIN", "RMS_CP95"); + List row = Arrays.asList(Timestamp.valueOf("2026-05-18 10:00:00"), + "line-001", "A", 0, 220.1D, 225.2D, 218.3D, 224.4D); + + List groups = mapper.mapFieldGroups("data_v", columns, row); + + Assertions.assertEquals(4, groups.size()); + Assertions.assertEquals("AVG", groups.get(0).getValueType()); + Assertions.assertEquals(220.1D, groups.get(0).getFields().get("rms")); + Assertions.assertEquals("MAX", groups.get(1).getValueType()); + Assertions.assertEquals(225.2D, groups.get(1).getFields().get("rms")); + Assertions.assertEquals("MIN", groups.get(2).getValueType()); + Assertions.assertEquals(218.3D, groups.get(2).getFields().get("rms")); + Assertions.assertEquals("CP95", groups.get(3).getValueType()); + Assertions.assertEquals(224.4D, groups.get(3).getFields().get("rms")); + } + + @Test + void shouldSkipValueTypeForFlickerMeasurements() { + AddDataInfluxFieldMapper mapper = new AddDataInfluxFieldMapper(); + List columns = Arrays.asList("TIMEID", "LINEID", "PHASIC_TYPE", "QUALITYFLAG", "FLUC", "FLUCCF"); + List row = Arrays.asList(Timestamp.valueOf("2026-05-18 10:00:00"), + "line-001", "T", 0, 0.31D, 0.29D); + + List groups = mapper.mapFieldGroups("data_fluc", columns, row); + + Assertions.assertEquals(1, groups.size()); + Assertions.assertNull(groups.get(0).getValueType()); + Assertions.assertEquals(0.31D, groups.get(0).getFields().get("fluc")); + Assertions.assertEquals(0.29D, groups.get(0).getFields().get("fluccf")); + } + + @Test + void shouldMapInfluxTimestampFromTimeId() { + AddDataInfluxFieldMapper mapper = new AddDataInfluxFieldMapper(); + LocalDateTime time = mapper.resolveTimeId(Arrays.asList("TIMEID"), Arrays.asList(Timestamp.valueOf("2026-05-18 10:00:00"))); + + Assertions.assertEquals(LocalDateTime.of(2026, 5, 18, 10, 0, 0), time); + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataInfluxWriterTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataInfluxWriterTest.java new file mode 100644 index 0000000..9c27c57 --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataInfluxWriterTest.java @@ -0,0 +1,31 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.config.AddDataInfluxDbProperties; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * add-data InfluxDB line protocol 契约测试。 + */ +class AddDataInfluxWriterTest { + + @Test + void shouldBuildEscapedLineProtocolWithTimestamp() { + AddDataInfluxWriter writer = new AddDataInfluxWriter(new AddDataInfluxDbProperties(), new AddDataInfluxFieldMapper()); + Map tags = new LinkedHashMap(); + tags.put("line_id", "line 001"); + tags.put("phasic_type", "A"); + tags.put("quality_flag", "0"); + tags.put("value_type", "AVG"); + Map fields = new LinkedHashMap(); + fields.put("rms", 220.1D); + + String line = writer.buildLineProtocol("data v", tags, fields, LocalDateTime.of(2026, 5, 18, 10, 0, 0)); + + Assertions.assertEquals("data\\ v,line_id=line\\ 001,phasic_type=A,quality_flag=0,value_type=AVG rms=220.1 1779098400000000000", line); + } +} diff --git a/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataValueGeneratorTest.java b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataValueGeneratorTest.java new file mode 100644 index 0000000..cab4ddd --- /dev/null +++ b/tools/add-data/src/test/java/com/njcn/gather/tool/adddata/component/AddDataValueGeneratorTest.java @@ -0,0 +1,28 @@ +package com.njcn.gather.tool.adddata.component; + +import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +/** + * add-data 生成值契约测试。 + */ +class AddDataValueGeneratorTest { + + @Test + void shouldMarkGeneratedDataAsValidByDefault() { + AddDataValueGenerator generator = new AddDataValueGenerator(); + AddDataTableDefinition definition = new AddDataTableDefinition("data_v", + Arrays.asList("TIMEID", "LINEID", "PHASIC_TYPE", "QUALITYFLAG", "RMS"), + Arrays.asList("A"), 100, AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL); + + List row = generator.generateRow(definition, "line-001", + LocalDateTime.of(2026, 5, 18, 10, 0, 0), "A"); + + Assertions.assertEquals(0, row.get(3)); + } +} diff --git a/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/AddLedgerLedgerMapper.java b/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/AddLedgerLedgerMapper.java index e851a43..911b848 100644 --- a/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/AddLedgerLedgerMapper.java +++ b/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/AddLedgerLedgerMapper.java @@ -15,6 +15,8 @@ public interface AddLedgerLedgerMapper extends BaseMapper { AddLedgerLedgerPO selectActiveNode(@Param("id") String id, @Param("level") Integer level); + AddLedgerLedgerPO selectNodeById(@Param("id") String id); + List selectActiveSubtree(@Param("id") String id); int softDeleteByIds(@Param("ids") List ids, @Param("updateBy") String updateBy); diff --git a/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/mapping/AddLedgerLedgerMapper.xml b/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/mapping/AddLedgerLedgerMapper.xml index 39923dd..0d808e6 100644 --- a/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/mapping/AddLedgerLedgerMapper.xml +++ b/tools/add-ledger/src/main/java/com/njcn/gather/tool/addledger/mapper/mapping/AddLedgerLedgerMapper.xml @@ -36,6 +36,20 @@ LIMIT 1 + +