新增用能定时任务
This commit is contained in:
@@ -2,18 +2,26 @@ package com.njcn.energy.controller;
|
||||
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
||||
import com.njcn.energy.pojo.api.EleIntegrityFeignClient;
|
||||
import com.njcn.energy.pojo.api.EleOnlineRateFeignClient;
|
||||
import com.njcn.energy.pojo.dto.HarmonicDTO;
|
||||
import com.njcn.energy.pojo.dto.OnlineRateDTO;
|
||||
import com.njcn.influx.config.InfluxDbConfig;
|
||||
import com.njcn.influx.utils.InfluxDbUtils;
|
||||
import io.swagger.annotations.Api;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.dto.BatchPoints;
|
||||
import org.influxdb.dto.Point;
|
||||
import org.influxdb.dto.QueryResult;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
@@ -27,10 +35,18 @@ import java.util.stream.Collectors;
|
||||
@RequestMapping("energyJob")
|
||||
public class StatisticDataRunController {
|
||||
|
||||
private final Integer POWER_DATA_DUE = 96;
|
||||
|
||||
private final Integer AIR_DATA_DUE = 288;
|
||||
|
||||
private final InfluxDbUtils influxDbUtils;
|
||||
|
||||
private final InfluxDbConfig influxDbConfig;
|
||||
|
||||
private final EleOnlineRateFeignClient eleOnlineRateFeignClient;
|
||||
|
||||
private final EleIntegrityFeignClient eleIntegrityFeignClient;
|
||||
|
||||
|
||||
/**
|
||||
* 调度统计电量增量
|
||||
@@ -149,4 +165,178 @@ public class StatisticDataRunController {
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 用能终端在线率数据
|
||||
* -------------------------------------------------start-----------------------------------------------------------
|
||||
*/
|
||||
@GetMapping("eleOnlineRateJob")
|
||||
public void eleOnlineRateJobHandler() {
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH)-1, 0, 0, 0);
|
||||
calendar.set(Calendar.MILLISECOND, 0);
|
||||
Calendar calendar2 = Calendar.getInstance();
|
||||
calendar2.set(calendar2.get(Calendar.YEAR), calendar2.get(Calendar.MONTH), calendar2.get(Calendar.DAY_OF_MONTH)-1, 23, 59, 59);
|
||||
calendar2.set(Calendar.MILLISECOND, 0);
|
||||
String startTime = format.format(calendar.getTime());
|
||||
String endTime = format.format(calendar2.getTime());
|
||||
List<OnlineRateDTO> list = eleOnlineRateFeignClient.getDeviceTime(startTime,endTime).getData();
|
||||
createOnlineMeasurement(list,calendar.getTimeInMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成用能终端在线率表
|
||||
*/
|
||||
private void createOnlineMeasurement(List<OnlineRateDTO> list, long time){
|
||||
List<String> records = new ArrayList<String>();
|
||||
list.forEach(item->{
|
||||
Map<String, String> tags = new HashMap<>();
|
||||
Map<String, Object> fields = new HashMap<>();
|
||||
tags.put("device_id",item.getDeviceId());
|
||||
fields.put("online_rate",item.getOnlineRate());
|
||||
Point point = influxDbUtils.pointBuilder("ele_online_rate", time, TimeUnit.MILLISECONDS, tags, fields);
|
||||
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("device_id", item.getDeviceId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
|
||||
batchPoints.point(point);
|
||||
records.add(batchPoints.lineProtocol());
|
||||
});
|
||||
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
|
||||
}
|
||||
|
||||
/**
|
||||
* -------------------------------------------------end-------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
/**
|
||||
* 用能数据完整性数据
|
||||
* -------------------------------------------------start-----------------------------------------------------------
|
||||
*/
|
||||
@GetMapping("eleIntegrityJob")
|
||||
public void eleIntegrityJobHandler() {
|
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH)-1, 0, 0, 0);
|
||||
calendar.set(Calendar.MILLISECOND, 0);
|
||||
Calendar calendar2 = Calendar.getInstance();
|
||||
calendar2.set(calendar2.get(Calendar.YEAR), calendar2.get(Calendar.MONTH), calendar2.get(Calendar.DAY_OF_MONTH)-1, 23, 59, 59);
|
||||
calendar2.set(Calendar.MILLISECOND, 0);
|
||||
String startTime = format.format(calendar.getTime());
|
||||
String endTime = format.format(calendar2.getTime());
|
||||
|
||||
List<HarmonicDTO> powerData = getPowerData(startTime,endTime);
|
||||
if(!CollectionUtils.isEmpty(powerData)){
|
||||
createMeasurement(powerData,calendar.getTimeInMillis());
|
||||
} else {
|
||||
List<HarmonicDTO> result = new ArrayList<>();
|
||||
//查询当前网关下所有监测点id
|
||||
List<String> lineList = eleIntegrityFeignClient.getPowerLineId().getData();
|
||||
lineList.forEach(item->{
|
||||
HarmonicDTO harmonicDTO = new HarmonicDTO();
|
||||
harmonicDTO.setId(item);
|
||||
harmonicDTO.setReal(0);
|
||||
harmonicDTO.setDue(96);
|
||||
result.add(harmonicDTO);
|
||||
});
|
||||
createMeasurement(result,calendar.getTimeInMillis());
|
||||
}
|
||||
List<HarmonicDTO> airData = getAirData(startTime,endTime);
|
||||
if(!CollectionUtils.isEmpty(airData)){
|
||||
powerData = new ArrayList<>(airData);
|
||||
createMeasurement(powerData,calendar.getTimeInMillis());
|
||||
} else {
|
||||
List<HarmonicDTO> result = new ArrayList<>();
|
||||
//查询当前网关下所有监测点id
|
||||
List<String> lineList = eleIntegrityFeignClient.getAirLineId().getData();
|
||||
lineList.forEach(item->{
|
||||
HarmonicDTO harmonicDTO = new HarmonicDTO();
|
||||
harmonicDTO.setId(item);
|
||||
harmonicDTO.setReal(0);
|
||||
harmonicDTO.setDue(288);
|
||||
result.add(harmonicDTO);
|
||||
});
|
||||
createMeasurement(result,calendar.getTimeInMillis());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成power_data数据
|
||||
*/
|
||||
private List<HarmonicDTO> getPowerData(String startTime, String endTime){
|
||||
List<HarmonicDTO> list = new ArrayList<>();
|
||||
String sql = "SELECT count(FundWattHr) FROM power_data where time >= '"+startTime+"' and time <= '"+endTime+"' group by LineId";
|
||||
QueryResult sqlResult = influxDbUtils.query(sql);
|
||||
//处理结果集
|
||||
List<QueryResult.Series> seriesList = sqlResult.getResults().get(0).getSeries();
|
||||
if (!CollectionUtils.isEmpty(seriesList)){
|
||||
seriesList.forEach(po -> {
|
||||
HarmonicDTO harmonicDTO = new HarmonicDTO();
|
||||
String lineId = po.getTags().get("LineId");
|
||||
List<List<Object>> countList = po.getValues();
|
||||
for (List<Object> value : countList) {
|
||||
//当日实际数据
|
||||
Integer count = new BigDecimal(value.get(1).toString()).intValue();
|
||||
harmonicDTO.setId(lineId);
|
||||
harmonicDTO.setReal(count);
|
||||
harmonicDTO.setDue(POWER_DATA_DUE);
|
||||
}
|
||||
list.add(harmonicDTO);
|
||||
});
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成air_data数据
|
||||
*/
|
||||
private List<HarmonicDTO> getAirData(String startTime, String endTime){
|
||||
List<HarmonicDTO> list = new ArrayList<>();
|
||||
String sql = "SELECT count(ACInMode),count(ACOutMode) FROM air_data where time >='"+startTime+"' and time <= '"+endTime+"' group by LineId";
|
||||
QueryResult sqlResult = influxDbUtils.query(sql);
|
||||
//处理结果集
|
||||
List<QueryResult.Series> seriesList = sqlResult.getResults().get(0).getSeries();
|
||||
if (!CollectionUtils.isEmpty(seriesList)){
|
||||
seriesList.forEach(po -> {
|
||||
HarmonicDTO harmonicDTO = new HarmonicDTO();
|
||||
String lineId = po.getTags().get("LineId");
|
||||
List<List<Object>> countList = po.getValues();
|
||||
for (List<Object> value : countList) {
|
||||
//当日实际数据
|
||||
if (!Objects.isNull(value.get(1))){
|
||||
harmonicDTO.setReal(new BigDecimal(value.get(1).toString()).intValue());
|
||||
} else {
|
||||
harmonicDTO.setReal(new BigDecimal(value.get(2).toString()).intValue());
|
||||
}
|
||||
harmonicDTO.setId(lineId);
|
||||
harmonicDTO.setDue(AIR_DATA_DUE);
|
||||
}
|
||||
list.add(harmonicDTO);
|
||||
});
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成用能数据完整性表
|
||||
*/
|
||||
private void createMeasurement(List<HarmonicDTO> list, long time){
|
||||
List<String> records = new ArrayList<String>();
|
||||
list.forEach(item->{
|
||||
Map<String, String> tags = new HashMap<>();
|
||||
Map<String, Object> fields = new HashMap<>();
|
||||
tags.put("line_id",item.getId());
|
||||
fields.put("real",item.getReal());
|
||||
fields.put("due",item.getDue());
|
||||
Point point = influxDbUtils.pointBuilder("ele_integrity", time, TimeUnit.MILLISECONDS, tags, fields);
|
||||
BatchPoints batchPoints = BatchPoints.database(influxDbConfig.getDatabase()).tag("line_id", item.getId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
|
||||
batchPoints.point(point);
|
||||
records.add(batchPoints.lineProtocol());
|
||||
});
|
||||
influxDbUtils.batchInsert(influxDbConfig.getDatabase(),"", InfluxDB.ConsistencyLevel.ALL, records);
|
||||
}
|
||||
|
||||
/**
|
||||
* -------------------------------------------------end-------------------------------------------------------------
|
||||
*/
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user