influx数据查询调整

This commit is contained in:
2023-06-08 21:04:22 +08:00
parent 818538f34d
commit 0b88a7a747
14 changed files with 15 additions and 302 deletions

View File

@@ -1,36 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.njcn.common.utils.serializer.InstantDateSerializer;
import lombok.Data;
import org.influxdb.annotation.Column;
import java.time.Instant;
/**
* 类的介绍influx实时数据通用实体
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:26
*/
@Data
public class CsCommon {
@Column(name = "time")
@JsonSerialize(using = InstantDateSerializer.class)
private Instant time;
@Column(name = "line_id")
private String lineId;
@Column(name = "phasic_type")
private String phaseType;
@Column(name = "value_type")
private String valueType;
@Column(name = "rtData")
private Double rtData;
}

View File

@@ -1,18 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:27
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Measurement(name = "data_harmphasic_i")
public class CsDataHarmPhasicI extends CsCommon{
}

View File

@@ -1,18 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:27
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Measurement(name = "data_harmphasic_v")
public class CsDataHarmPhasicV extends CsCommon{
}

View File

@@ -1,17 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:05
*/
@Data
@Measurement(name = "data_harmpower_p")
public class CsDataHarmPowerP extends CsCommon {
}

View File

@@ -1,18 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:27
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Measurement(name = "data_harmrate_i")
public class CsDataHarmRateI extends CsCommon{
}

View File

@@ -1,18 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:27
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Measurement(name = "data_harmrate_v")
public class CsDataHarmRateV extends CsCommon{
}

View File

@@ -1,18 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:27
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Measurement(name = "data_i")
public class CsDataI extends CsCommon{
}

View File

@@ -1,18 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 9:27
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Measurement(name = "data_v")
public class CsDataV extends CsCommon{
}

View File

@@ -1,18 +0,0 @@
package com.njcn.csharmonic.pojo.influx;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.influxdb.annotation.Measurement;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 11:21
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Measurement(name = "pqd_data")
public class CsPqdData extends CsCommon{
}

View File

@@ -1,20 +0,0 @@
package com.njcn.csharmonic.pojo.vo;
import com.njcn.csharmonic.pojo.influx.CsCommon;
import lombok.Data;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/6 11:38
*/
@Data
public class RtDataVO extends CsCommon {
private String targetTag;
private String targetName;
}

View File

@@ -6,9 +6,9 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil; import com.njcn.common.utils.HttpResultUtil;
import com.njcn.csdevice.pojo.vo.LineTargetVO; import com.njcn.csdevice.pojo.vo.LineTargetVO;
import com.njcn.csharmonic.pojo.vo.RtDataVO;
import com.njcn.csharmonic.pojo.vo.TargetDetailVO; import com.njcn.csharmonic.pojo.vo.TargetDetailVO;
import com.njcn.csharmonic.service.ILineTargetService; import com.njcn.csharmonic.service.ILineTargetService;
import com.njcn.influx.pojo.dto.StatisticalDataDTO;
import com.njcn.web.controller.BaseController; import com.njcn.web.controller.BaseController;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParam;
@@ -67,9 +67,9 @@ public class LineTargetController extends BaseController {
@ApiOperation("获取指标数据") @ApiOperation("获取指标数据")
@ApiImplicitParam(name = "id", value = "组态页面id", required = true) @ApiImplicitParam(name = "id", value = "组态页面id", required = true)
@Deprecated @Deprecated
public HttpResult<List<RtDataVO>> getLineData(@RequestParam("id") String id) { public HttpResult<List<StatisticalDataDTO>> getLineData(@RequestParam("id") String id) {
String methodDescribe = getMethodDescribe("getLineData"); String methodDescribe = getMethodDescribe("getLineData");
List<RtDataVO> list = lineTargetService.getLineData(id); List<StatisticalDataDTO> list = lineTargetService.getLineData(id);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, list, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, list, methodDescribe);
} }

View File

@@ -5,8 +5,8 @@ import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
import com.github.tocrhz.mqtt.annotation.NamedValue; import com.github.tocrhz.mqtt.annotation.NamedValue;
import com.github.tocrhz.mqtt.annotation.Payload; import com.github.tocrhz.mqtt.annotation.Payload;
import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.csharmonic.pojo.vo.RtDataVO;
import com.njcn.csharmonic.service.ILineTargetService; import com.njcn.csharmonic.service.ILineTargetService;
import com.njcn.influx.pojo.dto.StatisticalDataDTO;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -33,7 +33,7 @@ public class MqttMessageHandler {
*/ */
@MqttSubscribe(value = "/zl/askRtData/{pageId}",qos = 1) @MqttSubscribe(value = "/zl/askRtData/{pageId}",qos = 1)
public void responseRtData(String topic, @NamedValue("pageId") String pageId, MqttMessage message, @Payload String payload) { public void responseRtData(String topic, @NamedValue("pageId") String pageId, MqttMessage message, @Payload String payload) {
List<RtDataVO> list = lineTargetService.getLineData(pageId); List<StatisticalDataDTO> list = lineTargetService.getLineData(pageId);
Gson gson = new Gson(); Gson gson = new Gson();
publisher.send("/zl/rtData/"+pageId,gson.toJson(list),1,false); publisher.send("/zl/rtData/"+pageId,gson.toJson(list),1,false);
} }

View File

@@ -1,8 +1,8 @@
package com.njcn.csharmonic.service; package com.njcn.csharmonic.service;
import com.njcn.csdevice.pojo.vo.LineTargetVO; import com.njcn.csdevice.pojo.vo.LineTargetVO;
import com.njcn.csharmonic.pojo.vo.RtDataVO;
import com.njcn.csharmonic.pojo.vo.TargetDetailVO; import com.njcn.csharmonic.pojo.vo.TargetDetailVO;
import com.njcn.influx.pojo.dto.StatisticalDataDTO;
import java.util.List; import java.util.List;
@@ -30,6 +30,6 @@ public interface ILineTargetService {
* 获取绑定指标的数据 * 获取绑定指标的数据
* @param id * @param id
*/ */
List<RtDataVO> getLineData(String id); List<StatisticalDataDTO> getLineData(String id);
} }

View File

@@ -8,35 +8,24 @@ import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDevModelPO; import com.njcn.csdevice.pojo.po.CsDevModelPO;
import com.njcn.csdevice.pojo.po.CsDevModelRelationPO; import com.njcn.csdevice.pojo.po.CsDevModelRelationPO;
import com.njcn.csdevice.pojo.vo.LineTargetVO; import com.njcn.csdevice.pojo.vo.LineTargetVO;
import com.njcn.csharmonic.constant.HarmonicConstant;
import com.njcn.csharmonic.pojo.influx.CsDataHarmPowerP;
import com.njcn.csharmonic.pojo.influx.CsDataV;
import com.njcn.csharmonic.pojo.influx.CsPqdData;
import com.njcn.csharmonic.pojo.vo.ElementsVO; import com.njcn.csharmonic.pojo.vo.ElementsVO;
import com.njcn.csharmonic.pojo.vo.RtDataVO;
import com.njcn.csharmonic.pojo.vo.TargetDetailVO; import com.njcn.csharmonic.pojo.vo.TargetDetailVO;
import com.njcn.csharmonic.pojo.vo.ZuTaiVo; import com.njcn.csharmonic.pojo.vo.ZuTaiVo;
import com.njcn.csharmonic.service.CsPagePOService; import com.njcn.csharmonic.service.CsPagePOService;
import com.njcn.csharmonic.service.ILineTargetService; import com.njcn.csharmonic.service.ILineTargetService;
import com.njcn.csharmonic.utils.CsReflectUitl;
import com.njcn.influx.pojo.dto.StatisticalDataDTO; import com.njcn.influx.pojo.dto.StatisticalDataDTO;
import com.njcn.influx.service.CommonService; import com.njcn.influx.service.CommonService;
import com.njcn.influxdb.utils.InfluxDbUtils; import com.njcn.influxdb.utils.InfluxDbUtils;
import com.njcn.oss.utils.FileStorageUtil; import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.system.api.EpdFeignClient; import com.njcn.system.api.EpdFeignClient;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -103,8 +92,8 @@ public class LineTargetServiceImpl implements ILineTargetService {
} }
@Override @Override
public List<RtDataVO> getLineData(String id) { public List<StatisticalDataDTO> getLineData(String id) {
List<RtDataVO> result = new ArrayList<>(); List<StatisticalDataDTO> result = new ArrayList<>();
String path = csPagePOService.queryById(id).getPath(); String path = csPagePOService.queryById(id).getPath();
InputStream inputStream = fileStorageUtil.getFileStream(path); InputStream inputStream = fileStorageUtil.getFileStream(path);
List<ElementsVO> list = analysisJson(inputStream); List<ElementsVO> list = analysisJson(inputStream);
@@ -119,7 +108,7 @@ public class LineTargetServiceImpl implements ILineTargetService {
String lineId = item.getLineId().stream().reduce((first, second) -> second).orElse("no last element"); String lineId = item.getLineId().stream().reduce((first, second) -> second).orElse("no last element");
String dataType = item.getDataType(); String dataType = item.getDataType();
String phasic = item.getPhasic(); String phasic = item.getPhasic();
result.add(getLineRtData2(lineId,classId,targetTag,phasic,dataType,targetName)); result.add(getLineRtData(lineId,classId,targetTag,phasic,dataType,targetName));
} }
}); });
} }
@@ -146,57 +135,6 @@ public class LineTargetServiceImpl implements ILineTargetService {
return zuTai.getElements(); return zuTai.getElements();
} }
/**
* 通过拼接sql语句获取方法
* @param lineId 监测点Id
* @param tableName 表名称
* @param columnName 字段名称
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
public <T> T getLineRtData(String lineId, String tableName, String columnName, String phasic, String dataType) {
T t = null;
HashMap<String, Class<?>> entityClassesByAnnotation = CsReflectUitl.getEntityClassesByAnnotation();
Class<?> aClass = entityClassesByAnnotation.get(tableName);
String sql = "select line_id,phasic_type,value_type,last(" + columnName + ") AS rtData from "+ tableName +" where line_id = '" + lineId + "' and phasic_type = '" + phasic + "' and value_type = '" + dataType + "' tz('Asia/Shanghai')";
System.out.println("sql==:" + sql);
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<?> list = resultMapper.toPOJO(sqlResult, aClass);
if (CollectionUtil.isNotEmpty(list)){
t = (T)list.get(0);
}
return t;
}
/**
* 通过拼接sql语句获取方法
* @param lineId 监测点Id
* @param tableName 表名称
* @param columnName 字段名称
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
public RtDataVO getLineRtData2(String lineId, String tableName, String columnName, String phasic, String dataType, String target) {
RtDataVO vo = new RtDataVO();
String sql = "select line_id,phasic_type,value_type,last(" + columnName + ") AS rtData from "+ tableName +" where line_id = '" + lineId + "' and phasic_type = '" + phasic + "' and value_type = '" + dataType + "' tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
if (CollectionUtil.isNotEmpty(sqlResult.getResults().get(0).getSeries())){
vo.setRtData(BigDecimal.valueOf(Double.parseDouble(sqlResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(4).toString())).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue());
} else {
vo.setRtData(3.1415926);
}
vo.setLineId(lineId);
vo.setPhaseType(phasic);
vo.setValueType(dataType);
vo.setValueType(dataType);
vo.setTargetName(target);
vo.setTargetTag(columnName);
return vo;
}
/** /**
* 通过orm框架获取数据 * 通过orm框架获取数据
* @param lineId 监测点Id * @param lineId 监测点Id
@@ -204,38 +142,12 @@ public class LineTargetServiceImpl implements ILineTargetService {
* @param columnName 字段名称 * @param columnName 字段名称
* @param phasic 相别 * @param phasic 相别
* @param dataType 数据类型 * @param dataType 数据类型
* @param target 数据名称
* @return * @return
*/ */
public StatisticalDataDTO getLineRtData3(String lineId, String tableName, String columnName, String phasic, String dataType, String target) { public StatisticalDataDTO getLineRtData(String lineId, String tableName, String columnName, String phasic, String dataType, String target) {
return commonService.getLineRtData(lineId,tableName,columnName,phasic,dataType); StatisticalDataDTO statisticalDataDTO = commonService.getLineRtData(lineId,tableName,columnName,phasic,dataType);
statisticalDataDTO.setStatisticalName(target);
return statisticalDataDTO;
} }
/**
* 根据表明返回不同数据结果
*/
public RtDataVO findDataByTableName(String lineId, String tableName, String columnName, String phasic, String dataType,String targetName) {
RtDataVO rtDataVO = new RtDataVO();
switch (tableName) {
case HarmonicConstant.DATA_HARM_POWER_P:
CsDataHarmPowerP rtData1 = getLineRtData(lineId,tableName,targetName,phasic,dataType);
BeanUtils.copyProperties(rtData1,rtDataVO);
rtDataVO.setTargetName(targetName);
break;
case HarmonicConstant.DATA_V:
CsDataV rtData2 = getLineRtData(lineId,tableName,targetName,phasic,dataType);
BeanUtils.copyProperties(rtData2,rtDataVO);
rtDataVO.setTargetName(targetName);
break;
case HarmonicConstant.PQD_DATA:
CsPqdData rtData3 = getLineRtData(lineId,tableName,targetName,phasic,dataType);
BeanUtils.copyProperties(rtData3,rtDataVO);
rtDataVO.setTargetName(targetName);
break;
default:
break;
}
return rtDataVO;
}
} }