fix(tools): 修复台账节点查询和数据补录功能

- 修复 QUALITYFLAG 字段默认值从 1 改为 0
- 添加 selectNodeById 查询方法用于精确节点查找
- 重构 requireLedger 方法增加节点名称参数和详细错误提示
- 新增 levelName 辅助方法统一层级名称显示
- 更新 InfluxDB 配置地址从 192.168.1.68 改为 127.0.0.1
- 扩展 add-data 模块支持 InfluxDB 数据补录功能
- 新增 AddDataInfluxTaskController 提供 InfluxDB 补数任务接口
- 实现 AddDataInfluxFieldMapper 完成字段到 InfluxDB 测量值映射
- 添加 AddDataInfluxTaskExecutor 处理 InfluxDB 异步补数任务
- 更新 README 文档说明 InfluxDB 写入功能和配置要求
This commit is contained in:
2026-05-20 08:33:37 +08:00
parent bff89bede0
commit 89efc55119
21 changed files with 1259 additions and 18 deletions

View File

@@ -0,0 +1,190 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataInfluxFieldGroup;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
/**
* add-data 表字段到 InfluxDB 字段的映射器。
*/
@Component
public class AddDataInfluxFieldMapper {
/** 无 value_type tag 的 measurement。 */
private static final Set<String> NO_VALUE_TYPE_MEASUREMENTS = new LinkedHashSet<String>(
Arrays.asList("data_flicker", "data_fluc", "data_plt"));
/** 统计类型顺序。 */
private static final List<String> VALUE_TYPES = Arrays.asList("AVG", "MAX", "MIN", "CP95");
/** 派生字段后缀映射。 */
private static final Map<String, String> STAT_SUFFIX_MAP = buildStatSuffixMap();
/**
* 按 InfluxDB value_type 分组映射字段。
*
* @param measurement measurement 名称
* @param columns 表字段
* @param row 行数据
* @return 字段分组
*/
public List<AddDataInfluxFieldGroup> mapFieldGroups(String measurement, List<String> columns, List<Object> row) {
if (!hasValueTypeTag(measurement)) {
return mapWithoutValueType(measurement, columns, row);
}
Map<String, Map<String, Object>> groupedFields = new LinkedHashMap<String, Map<String, Object>>();
for (String valueType : VALUE_TYPES) {
groupedFields.put(valueType, new LinkedHashMap<String, Object>());
}
for (int i = 0; i < columns.size(); i++) {
String column = columns.get(i);
if (isInfrastructureColumn(column)) {
continue;
}
StatColumn statColumn = resolveStatColumn(column);
Object value = row.get(i);
if (value == null) {
continue;
}
groupedFields.get(statColumn.valueType).put(mapFieldName(measurement, statColumn.baseColumn), value);
}
List<AddDataInfluxFieldGroup> result = new ArrayList<AddDataInfluxFieldGroup>();
for (String valueType : VALUE_TYPES) {
Map<String, Object> fields = groupedFields.get(valueType);
if (!fields.isEmpty()) {
result.add(new AddDataInfluxFieldGroup(valueType, fields));
}
}
return result;
}
/**
* 从行数据中解析 TIMEID。
*
* @param columns 表字段
* @param row 行数据
* @return 时间
*/
public LocalDateTime resolveTimeId(List<String> columns, List<Object> row) {
Object value = getRequiredValue(columns, row, "TIMEID");
if (value instanceof Timestamp) {
return ((Timestamp) value).toLocalDateTime();
}
if (value instanceof LocalDateTime) {
return (LocalDateTime) value;
}
throw new IllegalArgumentException("TIMEID 类型不支持:" + value.getClass().getName());
}
/**
* 从行数据中解析字符串字段。
*
* @param columns 表字段
* @param row 行数据
* @param columnName 字段名
* @return 字符串值
*/
public String resolveString(List<String> columns, List<Object> row, String columnName) {
Object value = getRequiredValue(columns, row, columnName);
return String.valueOf(value);
}
/**
* 判断 measurement 是否带 value_type tag。
*
* @param measurement measurement 名称
* @return true 表示需要写 value_type
*/
public boolean hasValueTypeTag(String measurement) {
return !NO_VALUE_TYPE_MEASUREMENTS.contains(measurement);
}
private List<AddDataInfluxFieldGroup> mapWithoutValueType(String measurement, List<String> columns, List<Object> row) {
Map<String, Object> fields = new LinkedHashMap<String, Object>();
for (int i = 0; i < columns.size(); i++) {
String column = columns.get(i);
if (isInfrastructureColumn(column) || isDerivedColumn(column)) {
continue;
}
Object value = row.get(i);
if (value != null) {
fields.put(mapFieldName(measurement, column), value);
}
}
List<AddDataInfluxFieldGroup> result = new ArrayList<AddDataInfluxFieldGroup>();
if (!fields.isEmpty()) {
result.add(new AddDataInfluxFieldGroup(null, fields));
}
return result;
}
private Object getRequiredValue(List<String> columns, List<Object> row, String columnName) {
int index = columns.indexOf(columnName);
if (index < 0 || index >= row.size() || row.get(index) == null) {
throw new IllegalArgumentException("缺少 InfluxDB 写入字段:" + columnName);
}
return row.get(index);
}
private StatColumn resolveStatColumn(String column) {
for (Map.Entry<String, String> entry : STAT_SUFFIX_MAP.entrySet()) {
if (column.endsWith(entry.getKey())) {
return new StatColumn(column.substring(0, column.length() - entry.getKey().length()), entry.getValue());
}
}
return new StatColumn(column, "AVG");
}
private boolean isInfrastructureColumn(String column) {
return "TIMEID".equals(column) || "LINEID".equals(column) || "PHASIC_TYPE".equals(column) || "QUALITYFLAG".equals(column);
}
private boolean isDerivedColumn(String column) {
for (String suffix : STAT_SUFFIX_MAP.keySet()) {
if (column.endsWith(suffix)) {
return true;
}
}
return false;
}
private String mapFieldName(String measurement, String column) {
if ("data_v".equals(measurement) && "RMSAB".equals(column)) {
return "rms_lvr";
}
return column.toLowerCase(Locale.ENGLISH);
}
private static Map<String, String> buildStatSuffixMap() {
Map<String, String> result = new LinkedHashMap<String, String>();
result.put("_MAX", "MAX");
result.put("_MIN", "MIN");
result.put("_CP95", "CP95");
return result;
}
/**
* 字段对应的统计类型。
*/
private static final class StatColumn {
/** 基础字段名。 */
private final String baseColumn;
/** 统计类型。 */
private final String valueType;
private StatColumn(String baseColumn, String valueType) {
this.baseColumn = baseColumn;
this.valueType = valueType;
}
}
}

View File

@@ -0,0 +1,233 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
/**
* InfluxDB 补数异步任务执行器。
*/
@Slf4j
@Component
public class AddDataInfluxTaskExecutor {
/** 多个时间点组成一个处理窗口,兼顾批量效率和内存占用。 */
private static final int TIME_WINDOW_SIZE = 30;
/** 任务线程池。 */
private final ExecutorService addDataTaskExecutorService;
/** 表定义注册器。 */
private final AddDataTableRegistry addDataTableRegistry;
/** 时间槽计算器。 */
private final AddDataTimeSlotCalculator addDataTimeSlotCalculator;
/** 数据生成器。 */
private final AddDataValueGenerator addDataValueGenerator;
/** InfluxDB 写入器。 */
private final AddDataInfluxWriter addDataInfluxWriter;
/** 状态持有器。 */
private final AddDataTaskStatusHolder addDataTaskStatusHolder;
public AddDataInfluxTaskExecutor(@Qualifier("addDataTaskExecutorService") ExecutorService addDataTaskExecutorService,
AddDataTableRegistry addDataTableRegistry,
AddDataTimeSlotCalculator addDataTimeSlotCalculator,
AddDataValueGenerator addDataValueGenerator,
AddDataInfluxWriter addDataInfluxWriter,
AddDataTaskStatusHolder addDataTaskStatusHolder) {
this.addDataTaskExecutorService = addDataTaskExecutorService;
this.addDataTableRegistry = addDataTableRegistry;
this.addDataTimeSlotCalculator = addDataTimeSlotCalculator;
this.addDataValueGenerator = addDataValueGenerator;
this.addDataInfluxWriter = addDataInfluxWriter;
this.addDataTaskStatusHolder = addDataTaskStatusHolder;
}
/**
* 提交后台任务。
*
* @param taskId 任务编号
* @param command 任务命令
*/
public void submit(String taskId, AddDataTaskCommand command) {
addDataTaskExecutorService.submit(() -> execute(taskId, command));
}
private void execute(String taskId, AddDataTaskCommand command) {
try {
addDataTaskStatusHolder.markRunning(taskId);
Map<String, List<LocalDateTime>> timeSlotsByTable = buildTimeSlotsByTable(command);
Map<String, Set<LocalDateTime>> timeSlotLookupByTable = buildTimeSlotLookupByTable(timeSlotsByTable);
List<LocalDateTime> mergedTimeSlots = mergeTimeSlots(timeSlotsByTable);
Map<String, Integer> batchNoMap = new HashMap<String, Integer>();
Map<String, List<List<Object>>> pendingRowsByTable = buildPendingRowsByTable();
for (int startIndex = 0; startIndex < mergedTimeSlots.size(); startIndex += TIME_WINDOW_SIZE) {
int endIndex = Math.min(startIndex + TIME_WINDOW_SIZE, mergedTimeSlots.size());
List<LocalDateTime> timeWindow = mergedTimeSlots.subList(startIndex, endIndex);
GeneratedBatchData generatedBatchData = generateBatchData(command, timeWindow, timeSlotLookupByTable);
if (!writeBatchData(taskId, generatedBatchData, pendingRowsByTable, batchNoMap)) {
return;
}
}
if (!flushRemainingBatchData(taskId, pendingRowsByTable, batchNoMap)) {
return;
}
addDataTaskStatusHolder.markSuccess(taskId);
} catch (Exception ex) {
log.error("执行 InfluxDB 补数任务失败taskId={}", taskId, ex);
addDataTaskStatusHolder.markFailed(taskId, ex.getMessage());
}
}
private Map<String, List<LocalDateTime>> buildTimeSlotsByTable(AddDataTaskCommand command) {
Map<String, List<LocalDateTime>> result = new LinkedHashMap<String, List<LocalDateTime>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
int intervalMinutes = definition.resolveIntervalMinutes(command.getIntervalMinutes());
List<LocalDateTime> timeSlots = addDataTimeSlotCalculator.buildTimeSlots(
command.getStartTime(), command.getEndTime(), intervalMinutes);
result.put(definition.getTableName(), timeSlots);
}
return result;
}
private Map<String, Set<LocalDateTime>> buildTimeSlotLookupByTable(Map<String, List<LocalDateTime>> timeSlotsByTable) {
Map<String, Set<LocalDateTime>> result = new LinkedHashMap<String, Set<LocalDateTime>>();
for (Map.Entry<String, List<LocalDateTime>> entry : timeSlotsByTable.entrySet()) {
result.put(entry.getKey(), new HashSet<LocalDateTime>(entry.getValue()));
}
return result;
}
private Map<String, List<List<Object>>> buildPendingRowsByTable() {
Map<String, List<List<Object>>> result = new LinkedHashMap<String, List<List<Object>>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
result.put(definition.getTableName(), new ArrayList<List<Object>>());
}
return result;
}
private List<LocalDateTime> mergeTimeSlots(Map<String, List<LocalDateTime>> timeSlotsByTable) {
TreeSet<LocalDateTime> merged = new TreeSet<LocalDateTime>();
for (List<LocalDateTime> timeSlots : timeSlotsByTable.values()) {
merged.addAll(timeSlots);
}
return new ArrayList<LocalDateTime>(merged);
}
private GeneratedBatchData generateBatchData(AddDataTaskCommand command, List<LocalDateTime> timeWindow,
Map<String, Set<LocalDateTime>> timeSlotLookupByTable) {
Map<String, List<List<Object>>> rowsByTable = new LinkedHashMap<String, List<List<Object>>>();
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
rowsByTable.put(definition.getTableName(), new ArrayList<List<Object>>());
}
for (LocalDateTime timeSlot : timeWindow) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
Set<LocalDateTime> tableTimeSlots = timeSlotLookupByTable.get(definition.getTableName());
if (!tableTimeSlots.contains(timeSlot)) {
continue;
}
List<List<Object>> rows = rowsByTable.get(definition.getTableName());
for (String lineId : command.getLineIds()) {
for (String phaseCode : definition.getPhaseCodes()) {
rows.add(addDataValueGenerator.generateRow(definition, lineId, timeSlot, phaseCode));
}
}
}
}
return new GeneratedBatchData(rowsByTable);
}
private boolean writeBatchData(String taskId, GeneratedBatchData generatedBatchData,
Map<String, List<List<Object>>> pendingRowsByTable, Map<String, Integer> batchNoMap) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
List<List<Object>> rows = generatedBatchData.getRows(definition.getTableName());
if (rows.isEmpty()) {
continue;
}
List<List<Object>> pendingRows = pendingRowsByTable.get(definition.getTableName());
pendingRows.addAll(rows);
int batchSize = definition.getBatchSize();
while (pendingRows.size() >= batchSize) {
List<List<Object>> batchRows = new ArrayList<List<Object>>(pendingRows.subList(0, batchSize));
int batchNo = nextBatchNo(batchNoMap, definition.getTableName());
if (!flushBatch(taskId, definition, batchRows, batchNo)) {
return false;
}
pendingRows.subList(0, batchSize).clear();
}
}
return true;
}
private boolean flushRemainingBatchData(String taskId, Map<String, List<List<Object>>> pendingRowsByTable,
Map<String, Integer> batchNoMap) {
for (AddDataTableDefinition definition : addDataTableRegistry.getTableDefinitions()) {
List<List<Object>> pendingRows = pendingRowsByTable.get(definition.getTableName());
if (pendingRows == null || pendingRows.isEmpty()) {
continue;
}
int batchNo = nextBatchNo(batchNoMap, definition.getTableName());
List<List<Object>> batchRows = new ArrayList<List<Object>>(pendingRows);
if (!flushBatch(taskId, definition, batchRows, batchNo)) {
return false;
}
pendingRows.clear();
}
return true;
}
private int nextBatchNo(Map<String, Integer> batchNoMap, String tableName) {
Integer currentBatchNo = batchNoMap.get(tableName);
int nextBatchNo = currentBatchNo == null ? 1 : currentBatchNo + 1;
batchNoMap.put(tableName, nextBatchNo);
return nextBatchNo;
}
private boolean flushBatch(String taskId, AddDataTableDefinition definition, List<List<Object>> batchRows, int batchNo) {
addDataTaskStatusHolder.updateCurrentBatch(taskId, definition.getTableName(), batchNo, batchRows.size());
AddDataBatchWriteResult writeResult = addDataInfluxWriter.writeBatch(definition, batchRows);
addDataTaskStatusHolder.addProgress(taskId, writeResult.getInsertedCount(), writeResult.getSkippedCount(), writeResult.getFailedCount());
if (writeResult.hasFailure()) {
addDataTaskStatusHolder.markFailed(taskId,
"InfluxDB 表 " + definition.getTableName() + "" + batchNo + " 批执行失败:" + writeResult.getFirstFailureMessage());
return false;
}
return true;
}
/**
* 当前窗口生成结果。
*/
private static final class GeneratedBatchData {
/** 按表名分组的待写入行数据。 */
private final Map<String, List<List<Object>>> rowsByTable;
private GeneratedBatchData(Map<String, List<List<Object>>> rowsByTable) {
this.rowsByTable = rowsByTable;
}
private List<List<Object>> getRows(String tableName) {
List<List<Object>> rows = rowsByTable.get(tableName);
return rows == null ? Collections.emptyList() : rows;
}
}
}

View File

@@ -0,0 +1,213 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.config.AddDataInfluxDbProperties;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataBatchWriteResult;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataInfluxFieldGroup;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URLEncoder;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* add-data InfluxDB 批量写入组件。
*/
@Component
@RequiredArgsConstructor
public class AddDataInfluxWriter {
/** InfluxDB 配置。 */
private final AddDataInfluxDbProperties properties;
/** 字段映射器。 */
private final AddDataInfluxFieldMapper fieldMapper;
/**
* 写入一个批次。
*
* @param definition 表定义
* @param rows 行数据
* @return 写入结果
*/
public AddDataBatchWriteResult writeBatch(AddDataTableDefinition definition, List<List<Object>> rows) {
if (rows == null || rows.isEmpty()) {
return new AddDataBatchWriteResult(0L, 0L, 0L, null);
}
List<String> lines = new ArrayList<String>();
try {
validateConfig();
for (List<Object> row : rows) {
lines.addAll(buildRowLineProtocols(definition, row));
}
if (lines.isEmpty()) {
return new AddDataBatchWriteResult(0L, 0L, 0L, null);
}
executeWrite(String.join("\n", lines));
return new AddDataBatchWriteResult(lines.size(), 0L, 0L, null);
} catch (RuntimeException ex) {
long failedCount = lines.isEmpty() ? rows.size() : lines.size();
return new AddDataBatchWriteResult(0L, 0L, failedCount, ex.getMessage());
}
}
/**
* 构建单行 MySQL 结构对应的 InfluxDB line protocol。
*
* @param definition 表定义
* @param row 行数据
* @return line protocol 列表
*/
private List<String> buildRowLineProtocols(AddDataTableDefinition definition, List<Object> row) {
LocalDateTime timeId = fieldMapper.resolveTimeId(definition.getColumns(), row);
String lineId = fieldMapper.resolveString(definition.getColumns(), row, "LINEID");
String phasicType = fieldMapper.resolveString(definition.getColumns(), row, "PHASIC_TYPE");
String qualityFlag = fieldMapper.resolveString(definition.getColumns(), row, "QUALITYFLAG");
List<AddDataInfluxFieldGroup> groups = fieldMapper.mapFieldGroups(definition.getTableName(), definition.getColumns(), row);
List<String> result = new ArrayList<String>();
for (AddDataInfluxFieldGroup group : groups) {
Map<String, String> tags = new LinkedHashMap<String, String>();
tags.put("line_id", lineId);
tags.put("phasic_type", phasicType);
tags.put("quality_flag", qualityFlag);
if (group.getValueType() != null) {
tags.put("value_type", group.getValueType());
}
result.add(buildLineProtocol(definition.getTableName(), tags, group.getFields(), timeId));
}
return result;
}
/**
* 构建 InfluxDB line protocol。
*
* @param measurement measurement 名称
* @param tags tag 集合
* @param fields field 集合
* @param timeId 时间
* @return line protocol
*/
String buildLineProtocol(String measurement, Map<String, String> tags, Map<String, Object> fields, LocalDateTime timeId) {
StringBuilder line = new StringBuilder(escapeMeasurement(measurement));
for (Map.Entry<String, String> tag : tags.entrySet()) {
line.append(",").append(escapeKey(tag.getKey())).append("=").append(escapeKey(tag.getValue()));
}
line.append(" ");
boolean first = true;
for (Map.Entry<String, Object> field : fields.entrySet()) {
if (!first) {
line.append(",");
}
line.append(escapeKey(field.getKey())).append("=").append(formatFieldValue(field.getValue()));
first = false;
}
line.append(" ").append(toEpochNanos(timeId));
return line.toString();
}
private void executeWrite(String body) {
HttpURLConnection connection = null;
try {
URL url = new URL(buildWriteUrl());
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setConnectTimeout(properties.getConnectTimeoutMs());
connection.setReadTimeout(properties.getReadTimeoutMs());
connection.setDoOutput(true);
byte[] payload = body.getBytes(StandardCharsets.UTF_8);
connection.setFixedLengthStreamingMode(payload.length);
try (OutputStream outputStream = connection.getOutputStream()) {
outputStream.write(payload);
}
int status = connection.getResponseCode();
if (status < 200 || status >= 300) {
String errorBody = readBody(connection.getErrorStream());
throw new IllegalStateException("InfluxDB 写入失败:" + errorBody);
}
} catch (IOException ex) {
throw new IllegalStateException("InfluxDB 写入异常:" + ex.getMessage(), ex);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
private String buildWriteUrl() throws IOException {
StringBuilder url = new StringBuilder(trimRightSlash(properties.getUrl())).append("/write?");
url.append("db=").append(encode(properties.getDatabase()));
if (properties.getUsername() != null && !properties.getUsername().trim().isEmpty()) {
url.append("&u=").append(encode(properties.getUsername().trim()));
}
if (properties.getPassword() != null && !properties.getPassword().trim().isEmpty()) {
url.append("&p=").append(encode(properties.getPassword()));
}
return url.toString();
}
private void validateConfig() {
if (properties.getUrl() == null || properties.getUrl().trim().isEmpty()) {
throw new IllegalStateException("add-data InfluxDB 地址未配置");
}
if (properties.getDatabase() == null || properties.getDatabase().trim().isEmpty()) {
throw new IllegalStateException("add-data InfluxDB database 未配置");
}
}
private String readBody(InputStream stream) throws IOException {
if (stream == null) {
return "";
}
BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
StringBuilder body = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
body.append(line);
}
return body.toString();
}
private String trimRightSlash(String value) {
String text = value.trim();
while (text.endsWith("/")) {
text = text.substring(0, text.length() - 1);
}
return text;
}
private String encode(String value) throws IOException {
return URLEncoder.encode(value, StandardCharsets.UTF_8.name());
}
private String escapeMeasurement(String value) {
return value.replace(",", "\\,").replace(" ", "\\ ");
}
private String escapeKey(String value) {
return value.replace(",", "\\,").replace(" ", "\\ ").replace("=", "\\=");
}
private String formatFieldValue(Object value) {
if (value instanceof Number) {
return String.valueOf(value);
}
return "\"" + String.valueOf(value).replace("\\", "\\\\").replace("\"", "\\\"") + "\"";
}
private long toEpochNanos(LocalDateTime timeId) {
return timeId.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000000L;
}
}

View File

@@ -60,7 +60,7 @@ public class AddDataValueGenerator {
continue;
}
if ("QUALITYFLAG".equals(column)) {
row.add(1);
row.add(0);
continue;
}
row.add(resolveColumnValue(definition.getTableName(), column, baseValues, state));

View File

@@ -0,0 +1,32 @@
package com.njcn.gather.tool.adddata.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* add-data InfluxDB 写入配置,复用 steady InfluxDB 配置源。
*/
@Data
@Component
@ConfigurationProperties(prefix = "steady.influxdb")
public class AddDataInfluxDbProperties {
/** InfluxDB 访问地址,例如 http://127.0.0.1:18086。 */
private String url;
/** InfluxDB database。 */
private String database;
/** 用户名。 */
private String username;
/** 密码。 */
private String password;
/** 连接超时时间。 */
private Integer connectTimeoutMs = 5000;
/** 读取超时时间。 */
private Integer readTimeoutMs = 30000;
}

View File

@@ -0,0 +1,72 @@
package com.njcn.gather.tool.adddata.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.LogUtil;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.service.AddDataInfluxTaskService;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* InfluxDB 数据补录任务接口。
*/
@Validated
@Slf4j
@Api(tags = "InfluxDB 数据补录任务")
@RestController
@RequestMapping("/addData/influx/task")
@RequiredArgsConstructor
public class AddDataInfluxTaskController extends BaseController {
/** InfluxDB 数据补录任务服务。 */
private final AddDataInfluxTaskService addDataInfluxTaskService;
/**
* 创建 InfluxDB 后台补数任务。
*
* @param param 补数参数
* @return 任务编号
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("创建 InfluxDB 电能质量批量补数任务")
@PostMapping("/create")
public HttpResult<AddDataTaskCreateVO> create(@RequestBody @Validated AddDataTaskRequestParam param) {
String methodDescribe = getMethodDescribe("create");
LogUtil.njcnDebug(log, "{},开始创建 InfluxDB 补数任务lineCount={}, intervalMinutes={}",
methodDescribe, param.getLineIds() == null ? 0 : param.getLineIds().size(), param.getIntervalMinutes());
AddDataTaskCreateVO result = addDataInfluxTaskService.create(param);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
/**
* 查询 InfluxDB 补数任务状态。
*
* @param taskId 任务编号
* @return 当前任务状态
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("查询 InfluxDB 电能质量批量补数任务状态")
@GetMapping("/status/{taskId}")
public HttpResult<AddDataTaskStatusVO> status(@PathVariable("taskId") String taskId) {
String methodDescribe = getMethodDescribe("status");
LogUtil.njcnDebug(log, "{},开始查询 InfluxDB 补数任务状态taskId={}", methodDescribe, taskId);
AddDataTaskStatusVO result = addDataInfluxTaskService.getStatus(taskId);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -0,0 +1,47 @@
package com.njcn.gather.tool.adddata.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.gather.tool.adddata.pojo.enums.AddDataStorageTypeEnum;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataStorageTypeVO;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
/**
* add-data 入库类型接口。
*/
@Api(tags = "数据补录入库类型")
@RestController
@RequestMapping("/addData/storage-type")
public class AddDataStorageTypeController extends BaseController {
/**
* 查询当前支持的入库类型。
*
* @return 入库类型列表
*/
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@ApiOperation("查询数据补录入库类型")
@GetMapping("/list")
public HttpResult<List<AddDataStorageTypeVO>> list() {
String methodDescribe = getMethodDescribe("list");
List<AddDataStorageTypeVO> result = new ArrayList<AddDataStorageTypeVO>();
for (AddDataStorageTypeEnum storageType : AddDataStorageTypeEnum.values()) {
AddDataStorageTypeVO vo = new AddDataStorageTypeVO();
vo.setCode(storageType.getCode());
vo.setName(storageType.getName());
result.add(vo);
}
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -0,0 +1,25 @@
package com.njcn.gather.tool.adddata.pojo.bo;
import lombok.Getter;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* InfluxDB 单个 value_type 分组下的字段集合。
*/
@Getter
public class AddDataInfluxFieldGroup {
/** 统计类型,闪变类 measurement 不使用该 tag。 */
private final String valueType;
/** InfluxDB field 与字段值。 */
private final Map<String, Object> fields;
public AddDataInfluxFieldGroup(String valueType, Map<String, Object> fields) {
this.valueType = valueType;
this.fields = Collections.unmodifiableMap(new LinkedHashMap<String, Object>(fields));
}
}

View File

@@ -0,0 +1,27 @@
package com.njcn.gather.tool.adddata.pojo.enums;
import lombok.Getter;
/**
* add-data 支持的入库类型。
*/
@Getter
public enum AddDataStorageTypeEnum {
/** MySQL 入库。 */
MYSQL("MYSQL", "MySQL"),
/** InfluxDB 入库。 */
INFLUXDB("INFLUXDB", "InfluxDB");
/** 类型编码。 */
private final String code;
/** 展示名称。 */
private final String name;
AddDataStorageTypeEnum(String code, String name) {
this.code = code;
this.name = name;
}
}

View File

@@ -0,0 +1,19 @@
package com.njcn.gather.tool.adddata.pojo.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* add-data 入库类型。
*/
@Data
@ApiModel("add-data 入库类型")
public class AddDataStorageTypeVO {
@ApiModelProperty("类型编码")
private String code;
@ApiModelProperty("展示名称")
private String name;
}

View File

@@ -0,0 +1,27 @@
package com.njcn.gather.tool.adddata.service;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
/**
* InfluxDB 数据补录任务服务。
*/
public interface AddDataInfluxTaskService {
/**
* 创建 InfluxDB 后台补数任务。
*
* @param param 补数参数
* @return 任务编号
*/
AddDataTaskCreateVO create(AddDataTaskRequestParam param);
/**
* 查询任务状态。
*
* @param taskId 任务编号
* @return 当前任务状态
*/
AddDataTaskStatusVO getStatus(String taskId);
}

View File

@@ -0,0 +1,93 @@
package com.njcn.gather.tool.adddata.service.impl;
import com.njcn.gather.tool.adddata.component.AddDataInfluxTaskExecutor;
import com.njcn.gather.tool.adddata.component.AddDataTaskStatusHolder;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTaskCommand;
import com.njcn.gather.tool.adddata.pojo.param.AddDataTaskRequestParam;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskCreateVO;
import com.njcn.gather.tool.adddata.pojo.vo.AddDataTaskStatusVO;
import com.njcn.gather.tool.adddata.service.AddDataInfluxTaskService;
import com.njcn.gather.tool.adddata.util.AddDataDateTimeUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* InfluxDB 数据补录任务服务实现。
*/
@Service
@RequiredArgsConstructor
public class AddDataInfluxTaskServiceImpl implements AddDataInfluxTaskService {
/** 支持的用户步长。 */
private static final Set<Integer> SUPPORTED_INTERVALS = new LinkedHashSet<Integer>(Arrays.asList(1, 3, 5, 10));
/** 任务状态持有器。 */
private final AddDataTaskStatusHolder addDataTaskStatusHolder;
/** InfluxDB 后台执行器。 */
private final AddDataInfluxTaskExecutor addDataInfluxTaskExecutor;
@Override
public AddDataTaskCreateVO create(AddDataTaskRequestParam param) {
AddDataTaskCommand command = buildCommand(param);
AddDataTaskStatusVO snapshot = addDataTaskStatusHolder.createWaitingTask(command);
addDataInfluxTaskExecutor.submit(snapshot.getTaskId(), command);
AddDataTaskCreateVO result = new AddDataTaskCreateVO();
result.setTaskId(snapshot.getTaskId());
result.setStatus(snapshot.getStatus());
return result;
}
@Override
public AddDataTaskStatusVO getStatus(String taskId) {
if (taskId == null || taskId.trim().isEmpty()) {
throw new IllegalArgumentException("任务编号不能为空");
}
return addDataTaskStatusHolder.getStatus(taskId.trim());
}
private AddDataTaskCommand buildCommand(AddDataTaskRequestParam param) {
if (param == null) {
throw new IllegalArgumentException("补数参数不能为空");
}
Integer intervalMinutes = param.getIntervalMinutes();
if (intervalMinutes == null || !SUPPORTED_INTERVALS.contains(intervalMinutes)) {
throw new IllegalArgumentException("时间步长仅支持 1、3、5、10 分钟");
}
List<String> lineIds = normalizeLineIds(param.getLineIds());
LocalDateTime startTime = AddDataDateTimeUtil.parse(param.getStartTime());
LocalDateTime endTime = AddDataDateTimeUtil.parse(param.getEndTime());
if (startTime.isAfter(endTime)) {
throw new IllegalArgumentException("开始时间不能大于结束时间");
}
return new AddDataTaskCommand(lineIds, startTime, endTime, intervalMinutes);
}
private List<String> normalizeLineIds(List<String> lineIds) {
if (lineIds == null || lineIds.isEmpty()) {
throw new IllegalArgumentException("监测点 ID 列表不能为空");
}
LinkedHashSet<String> normalized = new LinkedHashSet<String>();
for (String lineId : lineIds) {
if (lineId == null) {
throw new IllegalArgumentException("监测点 ID 不能为空");
}
String normalizedLineId = lineId.trim();
if (normalizedLineId.isEmpty()) {
throw new IllegalArgumentException("监测点 ID 不能为空");
}
if (normalizedLineId.length() > 32) {
throw new IllegalArgumentException("监测点 ID 长度不能超过 32 位");
}
normalized.add(normalizedLineId);
}
return new ArrayList<String>(normalized);
}
}

View File

@@ -0,0 +1,60 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataInfluxFieldGroup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
/**
* add-data InfluxDB 字段映射契约测试。
*/
class AddDataInfluxFieldMapperTest {
@Test
void shouldSplitStatsIntoValueTypeGroups() {
AddDataInfluxFieldMapper mapper = new AddDataInfluxFieldMapper();
List<String> columns = Arrays.asList("TIMEID", "LINEID", "PHASIC_TYPE", "QUALITYFLAG",
"RMS", "RMS_MAX", "RMS_MIN", "RMS_CP95");
List<Object> row = Arrays.<Object>asList(Timestamp.valueOf("2026-05-18 10:00:00"),
"line-001", "A", 0, 220.1D, 225.2D, 218.3D, 224.4D);
List<AddDataInfluxFieldGroup> groups = mapper.mapFieldGroups("data_v", columns, row);
Assertions.assertEquals(4, groups.size());
Assertions.assertEquals("AVG", groups.get(0).getValueType());
Assertions.assertEquals(220.1D, groups.get(0).getFields().get("rms"));
Assertions.assertEquals("MAX", groups.get(1).getValueType());
Assertions.assertEquals(225.2D, groups.get(1).getFields().get("rms"));
Assertions.assertEquals("MIN", groups.get(2).getValueType());
Assertions.assertEquals(218.3D, groups.get(2).getFields().get("rms"));
Assertions.assertEquals("CP95", groups.get(3).getValueType());
Assertions.assertEquals(224.4D, groups.get(3).getFields().get("rms"));
}
@Test
void shouldSkipValueTypeForFlickerMeasurements() {
AddDataInfluxFieldMapper mapper = new AddDataInfluxFieldMapper();
List<String> columns = Arrays.asList("TIMEID", "LINEID", "PHASIC_TYPE", "QUALITYFLAG", "FLUC", "FLUCCF");
List<Object> row = Arrays.<Object>asList(Timestamp.valueOf("2026-05-18 10:00:00"),
"line-001", "T", 0, 0.31D, 0.29D);
List<AddDataInfluxFieldGroup> groups = mapper.mapFieldGroups("data_fluc", columns, row);
Assertions.assertEquals(1, groups.size());
Assertions.assertNull(groups.get(0).getValueType());
Assertions.assertEquals(0.31D, groups.get(0).getFields().get("fluc"));
Assertions.assertEquals(0.29D, groups.get(0).getFields().get("fluccf"));
}
@Test
void shouldMapInfluxTimestampFromTimeId() {
AddDataInfluxFieldMapper mapper = new AddDataInfluxFieldMapper();
LocalDateTime time = mapper.resolveTimeId(Arrays.asList("TIMEID"), Arrays.<Object>asList(Timestamp.valueOf("2026-05-18 10:00:00")));
Assertions.assertEquals(LocalDateTime.of(2026, 5, 18, 10, 0, 0), time);
}
}

View File

@@ -0,0 +1,31 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.config.AddDataInfluxDbProperties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* add-data InfluxDB line protocol 契约测试。
*/
class AddDataInfluxWriterTest {
@Test
void shouldBuildEscapedLineProtocolWithTimestamp() {
AddDataInfluxWriter writer = new AddDataInfluxWriter(new AddDataInfluxDbProperties(), new AddDataInfluxFieldMapper());
Map<String, String> tags = new LinkedHashMap<String, String>();
tags.put("line_id", "line 001");
tags.put("phasic_type", "A");
tags.put("quality_flag", "0");
tags.put("value_type", "AVG");
Map<String, Object> fields = new LinkedHashMap<String, Object>();
fields.put("rms", 220.1D);
String line = writer.buildLineProtocol("data v", tags, fields, LocalDateTime.of(2026, 5, 18, 10, 0, 0));
Assertions.assertEquals("data\\ v,line_id=line\\ 001,phasic_type=A,quality_flag=0,value_type=AVG rms=220.1 1779098400000000000", line);
}
}

View File

@@ -0,0 +1,28 @@
package com.njcn.gather.tool.adddata.component;
import com.njcn.gather.tool.adddata.pojo.bo.AddDataTableDefinition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
/**
* add-data 生成值契约测试。
*/
class AddDataValueGeneratorTest {
@Test
void shouldMarkGeneratedDataAsValidByDefault() {
AddDataValueGenerator generator = new AddDataValueGenerator();
AddDataTableDefinition definition = new AddDataTableDefinition("data_v",
Arrays.asList("TIMEID", "LINEID", "PHASIC_TYPE", "QUALITYFLAG", "RMS"),
Arrays.asList("A"), 100, AddDataTableDefinition.TimeAxisType.REQUEST_INTERVAL);
List<Object> row = generator.generateRow(definition, "line-001",
LocalDateTime.of(2026, 5, 18, 10, 0, 0), "A");
Assertions.assertEquals(0, row.get(3));
}
}