组态功能开发

This commit is contained in:
2023-06-06 16:42:45 +08:00
parent befb138068
commit 0b65b55d28
22 changed files with 613 additions and 23 deletions

View File

@@ -6,6 +6,7 @@ import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.csdevice.pojo.vo.LineTargetVO;
import com.njcn.harmonic.pojo.vo.RtDataVO;
import com.njcn.harmonic.pojo.vo.TargetDetailVO;
import com.njcn.harmonic.service.ILineTargetService;
import com.njcn.web.controller.BaseController;
@@ -65,10 +66,11 @@ public class LineTargetController extends BaseController {
@PostMapping("/lineData")
@ApiOperation("获取指标数据")
@ApiImplicitParam(name = "id", value = "组态页面id", required = true)
public HttpResult<String> getLineData(@RequestParam("id") String id) {
@Deprecated
public HttpResult<List<RtDataVO>> getLineData(@RequestParam("id") String id) {
String methodDescribe = getMethodDescribe("getLineData");
lineTargetService.getLineData(id);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
List<RtDataVO> list = lineTargetService.getLineData(id);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, list, methodDescribe);
}
}

View File

@@ -0,0 +1,40 @@
package com.njcn.harmonic.handler;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
import com.github.tocrhz.mqtt.annotation.NamedValue;
import com.github.tocrhz.mqtt.annotation.Payload;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.harmonic.pojo.vo.RtDataVO;
import com.njcn.harmonic.service.ILineTargetService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author hongawen
* @version 1.0.0
* @date 2022年03月23日 09:41
*/
@Slf4j
@Component
@AllArgsConstructor
public class MqttMessageHandler {
private final MqttPublisher publisher;
private final ILineTargetService lineTargetService;
/**
* 实时数据应答
*/
@MqttSubscribe(value = "/zl/askRtData/{pageId}",qos = 1)
public void responseRtData(String topic, @NamedValue("pageId") String pageId, MqttMessage message, @Payload String payload) {
List<RtDataVO> list = lineTargetService.getLineData(pageId);
Gson gson = new Gson();
publisher.send("/zl/rtData/"+pageId,gson.toJson(list),1,false);
}
}

View File

@@ -1,6 +1,7 @@
package com.njcn.harmonic.service;
import com.njcn.csdevice.pojo.vo.LineTargetVO;
import com.njcn.harmonic.pojo.vo.RtDataVO;
import com.njcn.harmonic.pojo.vo.TargetDetailVO;
import java.util.List;
@@ -29,7 +30,6 @@ public interface ILineTargetService {
* 获取绑定指标的数据
* @param id
*/
void getLineData(String id);
List<RtDataVO> getLineData(String id);
}

View File

@@ -1,18 +1,31 @@
package com.njcn.harmonic.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsDevModelPO;
import com.njcn.csdevice.pojo.po.CsDevModelRelationPO;
import com.njcn.csdevice.pojo.vo.LineTargetVO;
import com.njcn.harmonic.constant.HarmonicConstant;
import com.njcn.harmonic.pojo.influx.CsDataHarmPowerP;
import com.njcn.harmonic.pojo.influx.CsDataV;
import com.njcn.harmonic.pojo.influx.CsPqdData;
import com.njcn.harmonic.pojo.vo.ElementsVO;
import com.njcn.harmonic.pojo.vo.RtDataVO;
import com.njcn.harmonic.pojo.vo.TargetDetailVO;
import com.njcn.harmonic.pojo.vo.ZuTaiVo;
import com.njcn.harmonic.service.CsPagePOService;
import com.njcn.harmonic.service.ILineTargetService;
import com.njcn.harmonic.utils.CsReflectUitl;
import com.njcn.influxdb.utils.InfluxDbUtils;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.system.api.EpdFeignClient;
import lombok.AllArgsConstructor;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.io.BufferedReader;
@@ -20,6 +33,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -49,6 +63,13 @@ public class LineTargetServiceImpl implements ILineTargetService {
private final CsPagePOService csPagePOService;
private final MqttPublisher publisher;
private final InfluxDbUtils influxDbUtils;
private final EpdFeignClient epdFeignClient;
@Override
public List<LineTargetVO> getLineTarget(String lineId) {
List<LineTargetVO> list = new ArrayList<>();
@@ -77,12 +98,26 @@ public class LineTargetServiceImpl implements ILineTargetService {
}
@Override
public void getLineData(String id) {
public List<RtDataVO> getLineData(String id) {
List<RtDataVO> result = new ArrayList<>();
String path = csPagePOService.queryById(id).getPath();
InputStream inputStream = fileStorageUtil.getFileStream(path);
List<ElementsVO> list = analysisJson(inputStream);
System.out.println("list==:" + list);
//influx的原始方法获取数据
if (CollectionUtil.isNotEmpty(list)){
list.forEach(item->{
String targetName = item.getTargetId().stream().reduce((first, second) -> second).orElse("no last element");
if (CollectionUtil.isNotEmpty(dataArrayFeignClient.getDataArrayById(item.getTargetPid(),targetName).getData())){
String dataId = dataArrayFeignClient.getDataArrayById(item.getTargetPid(),targetName).getData().get(0).getDataId();
String classId = epdFeignClient.selectById(dataId).getData().getClassId();
String lineId = item.getLineId().stream().reduce((first, second) -> second).orElse("no last element");
String dataType = item.getDataType();
String phasic = item.getPhasic();
result.add(getLineRtData2(lineId,classId,targetName,phasic,dataType,targetName));
}
});
}
return result;
}
/**
@@ -105,4 +140,78 @@ public class LineTargetServiceImpl implements ILineTargetService {
return zuTai.getElements();
}
/**
* 通过拼接sql语句获取方法
* @param lineId 监测点Id
* @param tableName 表名称
* @param columnName 字段名称
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
public <T> T getLineRtData(String lineId, String tableName, String columnName, String phasic, String dataType) {
T t = null;
HashMap<String, Class<?>> entityClassesByAnnotation = CsReflectUitl.getEntityClassesByAnnotation();
Class<?> aClass = entityClassesByAnnotation.get(tableName);
String sql = "select line_id,phasic_type,value_type,last(" + columnName + ") AS rtData from "+ tableName +" where line_id = '" + lineId + "' and phasic_type = '" + phasic + "' and value_type = '" + dataType + "' tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<?> list = resultMapper.toPOJO(sqlResult, aClass);
if (CollectionUtil.isNotEmpty(list)){
t = (T)list.get(0);
}
return t;
}
/**
* 通过拼接sql语句获取方法
* @param lineId 监测点Id
* @param tableName 表名称
* @param columnName 字段名称
* @param phasic 相别
* @param dataType 数据类型
* @return
*/
public RtDataVO getLineRtData2(String lineId, String tableName, String columnName, String phasic, String dataType, String target) {
RtDataVO vo = new RtDataVO();
String sql = "select line_id,phasic_type,value_type,last(" + columnName + ") AS rtData from "+ tableName +" where line_id = '" + lineId + "' and phasic_type = '" + phasic + "' and value_type = '" + dataType + "' tz('Asia/Shanghai')";
QueryResult sqlResult = influxDbUtils.query(sql);
if (CollectionUtil.isNotEmpty(sqlResult.getResults())){
vo.setRtData(Double.parseDouble(sqlResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(4).toString()));
}
vo.setLineId(lineId);
vo.setPhaseType(phasic);
vo.setValueType(dataType);
vo.setValueType(dataType);
vo.setTargetName(target);
return vo;
}
/**
* 根据表明返回不同数据结果
*/
public RtDataVO findDataByTableName(String lineId, String tableName, String columnName, String phasic, String dataType,String targetName) {
RtDataVO rtDataVO = new RtDataVO();
switch (tableName) {
case HarmonicConstant.DATA_HARM_POWER_P:
CsDataHarmPowerP rtData1 = getLineRtData(lineId,tableName,targetName,phasic,dataType);
BeanUtils.copyProperties(rtData1,rtDataVO);
rtDataVO.setTargetName(targetName);
break;
case HarmonicConstant.DATA_V:
CsDataV rtData2 = getLineRtData(lineId,tableName,targetName,phasic,dataType);
BeanUtils.copyProperties(rtData2,rtDataVO);
rtDataVO.setTargetName(targetName);
break;
case HarmonicConstant.PQD_DATA:
CsPqdData rtData3 = getLineRtData(lineId,tableName,targetName,phasic,dataType);
BeanUtils.copyProperties(rtData3,rtDataVO);
rtDataVO.setTargetName(targetName);
break;
default:
break;
}
return rtDataVO;
}
}

View File

@@ -0,0 +1,61 @@
package com.njcn;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.harmonic.pojo.vo.RtDataVO;
import com.njcn.harmonic.service.ILineTargetService;
import com.njcn.influxdb.param.InfluxDBPublicParam;
import com.njcn.influxdb.utils.InfluxDbUtils;
import lombok.AllArgsConstructor;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 类的介绍:
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/6/5 14:44
*/
@AllArgsConstructor
public class InfluxDbTest {
private final MqttPublisher publisher;
public static void main(String[] args) {
}
public void insert() {
InfluxDbUtils influxDbUtils = new InfluxDbUtils("admin", "123456", "http://192.168.1.16:8086", "pqsbase_zl", "");
List<String> records = new ArrayList<>();
List<String> phasic = Arrays.asList("A","B","C");
List<String> dataType = Arrays.asList("max","min","avg","cp95");
long time = System.currentTimeMillis();
for (String item1 : phasic) {
for (String item2 : dataType) {
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
tags.put("line_id","4");
tags.put("phasic_type",item1);
tags.put("value_type",item2);
fields.put("W",new Random().nextDouble());
fields.put("PhV",new Random().nextDouble());
Point point = influxDbUtils.pointBuilder("pqd_data", time, TimeUnit.MILLISECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName ()).tag(InfluxDBPublicParam.LINE_ID, "4").tag(InfluxDBPublicParam.PHASIC_TYPE,item1).tag(InfluxDBPublicParam.VALUE_TYPE,item2).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
}
}
influxDbUtils.batchInsert(influxDbUtils.getDbName (),"", InfluxDB.ConsistencyLevel.ALL, records);
}
}