diff --git a/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java b/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java index d052bef1f..b73b3878c 100644 --- a/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java +++ b/pqs-common/common-influxdb/src/main/java/com/njcn/influxdb/param/InfluxDBPublicParam.java @@ -186,6 +186,11 @@ public interface InfluxDBPublicParam { */ String LINE_ID = "line_id"; + /** + * 装置ID + */ + String DEV_ID = "dev_id"; + /** * 数据类型 */ @@ -239,9 +244,11 @@ public interface InfluxDBPublicParam { /** * 终端在线率表公共字段 */ - String ONLINEMIN = "onlinemin"; + String ONLINE_MIN = "online_min"; - String OFFLINEMIN = "offlinemin"; + String OFFLINE_MIN = "offline_min"; + + String ONLINE_RATE = "online_rate"; /** * data表中InfluxDBPublicParam diff --git a/pqs-device/device-api/src/main/java/com/njcn/device/api/LineFeignClient.java b/pqs-device/device-api/src/main/java/com/njcn/device/api/LineFeignClient.java index 6508ad899..1b480e355 100644 --- a/pqs-device/device-api/src/main/java/com/njcn/device/api/LineFeignClient.java +++ b/pqs-device/device-api/src/main/java/com/njcn/device/api/LineFeignClient.java @@ -201,4 +201,13 @@ public interface LineFeignClient { @PostMapping("getLineDetail") HttpResult> getLineDetail(@RequestParam(required = false,value = "list") List list); + /** + * 功能描述: 获取指定条件的装置id(实际装置、投运、稳态或者双系统) + * @author xy + * @date 2022/7/8 14:24 + * @return 装置id集合 + */ + @PostMapping("getDeviceList") + HttpResult> getDeviceList(); + } diff --git a/pqs-device/device-api/src/main/java/com/njcn/device/api/fallback/LineFeignClientFallbackFactory.java b/pqs-device/device-api/src/main/java/com/njcn/device/api/fallback/LineFeignClientFallbackFactory.java index fc88ba0c5..fa131a136 100644 --- a/pqs-device/device-api/src/main/java/com/njcn/device/api/fallback/LineFeignClientFallbackFactory.java +++ b/pqs-device/device-api/src/main/java/com/njcn/device/api/fallback/LineFeignClientFallbackFactory.java @@ -155,6 +155,12 @@ public class LineFeignClientFallbackFactory implements FallbackFactory> getDeviceList() { + log.error("{}异常,降级处理,异常为:{}", "获取装置Id集合: ", throwable.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; } diff --git a/pqs-device/device-boot/src/main/java/com/njcn/device/controller/LineController.java b/pqs-device/device-boot/src/main/java/com/njcn/device/controller/LineController.java index 5a11894c7..ce68696e3 100644 --- a/pqs-device/device-boot/src/main/java/com/njcn/device/controller/LineController.java +++ b/pqs-device/device-boot/src/main/java/com/njcn/device/controller/LineController.java @@ -272,10 +272,20 @@ public class LineController extends BaseController { @PostMapping("/getLineDetail") @ApiOperation("根据监测点集合获取监测点详情") @ApiImplicitParam(name = "list", value = "监测点集合") + @ApiIgnore public HttpResult> getLineDetail(@RequestParam(required = false) List list) { String methodDescribe = getMethodDescribe("getLineDetail"); LogUtil.njcnDebug(log, "{},监测点集合:{}", methodDescribe, list); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, lineDetailMapper.getSpecifyLineDetail(list), methodDescribe); } + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/getDeviceList") + @ApiOperation("获取生成在线率的装置Id") + @ApiIgnore + public HttpResult> getDeviceList() { + String methodDescribe = getMethodDescribe("getDeviceList"); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, lineMapper.getDeviceList(), methodDescribe); + } + } diff --git a/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/LineMapper.java b/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/LineMapper.java index 6729d960d..5a898ea25 100644 --- a/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/LineMapper.java +++ b/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/LineMapper.java @@ -313,6 +313,9 @@ public interface LineMapper extends BaseMapper { */ List getVoltageListBySubId(@Param("subId")String subId,@Param("voltageName")List voltageName); - - + /** + * 获取生成在线率的装置Id + * @return 装置Id + */ + List getDeviceList(); } diff --git a/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/mapping/LineMapper.xml b/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/mapping/LineMapper.xml index 215beabc7..e10d92078 100644 --- a/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/mapping/LineMapper.xml +++ b/pqs-device/device-boot/src/main/java/com/njcn/device/mapper/mapping/LineMapper.xml @@ -948,4 +948,8 @@ FROM + + diff --git a/pqs-device/device-boot/src/main/java/com/njcn/device/service/impl/RunManageServiceImpl.java b/pqs-device/device-boot/src/main/java/com/njcn/device/service/impl/RunManageServiceImpl.java index 9c7f0a61f..37e3adf69 100644 --- a/pqs-device/device-boot/src/main/java/com/njcn/device/service/impl/RunManageServiceImpl.java +++ b/pqs-device/device-boot/src/main/java/com/njcn/device/service/impl/RunManageServiceImpl.java @@ -19,15 +19,14 @@ import com.njcn.device.pojo.po.LineDetail; import com.njcn.device.pojo.vo.*; import com.njcn.device.service.LineService; import com.njcn.device.service.RunManageService; -import com.njcn.influxdb.param.InfluxDBPublicParam; +import com.njcn.device.service.TerminalBaseService; import com.njcn.influxdb.utils.InfluxDbUtils; import com.njcn.system.api.AreaFeignClient; import com.njcn.system.api.DicDataFeignClient; -import com.njcn.web.pojo.vo.LineDataVO; -import com.njcn.device.service.TerminalBaseService; import com.njcn.system.enums.DicDataTypeEnum; import com.njcn.system.pojo.enums.StatisticsEnum; import com.njcn.system.pojo.po.DictData; +import com.njcn.web.pojo.vo.LineDataVO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.influxdb.dto.QueryResult; @@ -44,6 +43,8 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.njcn.influxdb.param.InfluxDBPublicParam.*; + /** * @author denghuajun * @version 1.0.0 @@ -198,9 +199,9 @@ public class RunManageServiceImpl implements RunManageService { QueryResult queryResult; if (!lineId.isEmpty()) { //组装sql语句 - String stringBuilder = "time >= '" + startTime + "' and " + "time <= '" + endTime + "' and (" + InfluxDBPublicParam.DEV_INDEX +" ='" + lineId + "') "; + String stringBuilder = "time >= '" + startTime + "' and " + "time <= '" + endTime + "' and (" + DEV_INDEX +" ='" + lineId + "') "; //sql语句 - String sql = "SELECT SUM("+InfluxDBPublicParam.ONLINEMIN+")/(SUM("+InfluxDBPublicParam.OFFLINEMIN+")+SUM("+InfluxDBPublicParam.ONLINEMIN+")) AS onlineRate FROM "+InfluxDBPublicParam.PQS_ONLINERATE+" WHERE " + stringBuilder+InfluxDBPublicParam.TIME_ZONE; + String sql = "SELECT SUM("+ ONLINE_MIN+")/(SUM("+OFFLINE_MIN+")+SUM("+ONLINE_MIN+")) AS onlineRate FROM "+PQS_ONLINERATE+" WHERE " + stringBuilder+TIME_ZONE; queryResult = influxDbUtils.query(sql); //处理结果集 List list = queryResult.getResults().get(0).getSeries(); diff --git a/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/CommunicateServiceImpl.java b/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/CommunicateServiceImpl.java index 6196cac30..b6610f505 100644 --- a/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/CommunicateServiceImpl.java +++ b/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/CommunicateServiceImpl.java @@ -2,12 +2,10 @@ package com.njcn.harmonic.service.impl; import com.njcn.device.api.LineFeignClient; import com.njcn.device.pojo.vo.CommunicateVO; -import com.njcn.harmonic.constant.Param; import com.njcn.harmonic.pojo.param.PulicTimeParam; import com.njcn.harmonic.pojo.param.PulicTimeStatisParam; import com.njcn.harmonic.pojo.vo.CommunicateStatisticsVO; import com.njcn.harmonic.service.CommunicateService; -import com.njcn.influxdb.param.InfluxDBPublicParam; import com.njcn.influxdb.utils.InfluxDbUtils; import lombok.AllArgsConstructor; import org.apache.commons.lang.StringUtils; @@ -19,6 +17,8 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; +import static com.njcn.influxdb.param.InfluxDBPublicParam.*; + /** * @author denghuajun * @date 2022/2/28 @@ -64,17 +64,17 @@ public class CommunicateServiceImpl implements CommunicateService { final Float[] resultList = {0.0f}; //组装sql语句 StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(InfluxDBPublicParam.TIME + " >= '").append(startTime).append(InfluxDBPublicParam.START_TIME).append("' and ").append(InfluxDBPublicParam.TIME).append(" <= '").append(endTime).append(InfluxDBPublicParam.END_TIME).append("' and ("); + stringBuilder.append(TIME + " >= '").append(startTime).append(START_TIME).append("' and ").append(TIME).append(" <= '").append(endTime).append(END_TIME).append("' and ("); //sql语句 String sql = ""; if (state == 0) { - stringBuilder.append(InfluxDBPublicParam.LINE_ID + "='").append(lineList).append("')"); - sql = "SELECT SUM(" + InfluxDBPublicParam.REAL+ ")/SUM(" + InfluxDBPublicParam.DUE + ") AS integrity FROM pqs_integrity WHERE " + stringBuilder.toString() + " group by " + InfluxDBPublicParam.LINE_ID; + stringBuilder.append(LINE_ID + "='").append(lineList).append("')"); + sql = "SELECT SUM(" + REAL+ ")/SUM(" + DUE + ") AS integrity FROM pqs_integrity WHERE " + stringBuilder.toString() + " group by " + LINE_ID; } else { - stringBuilder.append(InfluxDBPublicParam.DEV_INDEX + "='").append(lineList).append("')"); - sql = "SELECT SUM(" + InfluxDBPublicParam.ONLINEMIN + ")/(SUM(" + InfluxDBPublicParam.OFFLINEMIN + ")+SUM(" + InfluxDBPublicParam.ONLINEMIN + ")) AS onlineRate FROM pqs_onlinerate WHERE " + stringBuilder.toString() + " group by " + InfluxDBPublicParam.DEV_INDEX; + stringBuilder.append(DEV_INDEX + "='").append(lineList).append("')"); + sql = "SELECT SUM(" + ONLINE_MIN + ")/(SUM(" + OFFLINE_MIN + ")+SUM(" + ONLINE_MIN + ")) AS onlineRate FROM pqs_onlinerate WHERE " + stringBuilder.toString() + " group by " + DEV_INDEX; } - stringBuilder.append(InfluxDBPublicParam.TIME_ZONE); + stringBuilder.append(TIME_ZONE); //结果集 QueryResult result = influxDbUtils.query(sql); //处理结果集 diff --git a/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/OnlineRateDataServiceImpl.java b/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/OnlineRateDataServiceImpl.java index 11d3770c1..5947da47c 100644 --- a/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/OnlineRateDataServiceImpl.java +++ b/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/OnlineRateDataServiceImpl.java @@ -10,7 +10,6 @@ import com.njcn.harmonic.pojo.dto.PublicDTO; import com.njcn.harmonic.pojo.vo.OnlineRateCensusVO; import com.njcn.harmonic.pojo.vo.OnlineRateVO; import com.njcn.harmonic.service.OnlineRateDataService; -import com.njcn.influxdb.param.InfluxDBPublicParam; import com.njcn.influxdb.utils.InfluxDbUtils; import lombok.AllArgsConstructor; import org.influxdb.dto.QueryResult; @@ -23,6 +22,8 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import static com.njcn.influxdb.param.InfluxDBPublicParam.*; + /** * 类的介绍: * @@ -145,16 +146,16 @@ public class OnlineRateDataServiceImpl implements OnlineRateDataService { List publicDTOList = new ArrayList<>(); //组装sql语句 StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(InfluxDBPublicParam.TIME + " >= '").append(startTime).append(InfluxDBPublicParam.START_TIME).append("' and ").append(InfluxDBPublicParam.TIME).append(" <= '").append(endTime).append(InfluxDBPublicParam.END_TIME).append("' and ("); + stringBuilder.append(TIME + " >= '").append(startTime).append(START_TIME).append("' and ").append(TIME).append(" <= '").append(endTime).append(END_TIME).append("' and ("); for (int i = 0; i < deviceIndexes.size(); i++) { if (deviceIndexes.size() - i != 1) { - stringBuilder.append(InfluxDBPublicParam.DEV_INDEX + "='").append(deviceIndexes.get(i)).append("' or "); + stringBuilder.append(DEV_INDEX + "='").append(deviceIndexes.get(i)).append("' or "); } else { - stringBuilder.append(InfluxDBPublicParam.DEV_INDEX + "='").append(deviceIndexes.get(i)).append("')"); + stringBuilder.append(DEV_INDEX + "='").append(deviceIndexes.get(i)).append("')"); } } //sql语句 - String sql = "SELECT (SUM(" + InfluxDBPublicParam.ONLINEMIN + ")/(SUM(" + InfluxDBPublicParam.OFFLINEMIN + ")+SUM(" + InfluxDBPublicParam.ONLINEMIN + ")))*100 AS onlineRate FROM "+InfluxDBPublicParam.PQS_ONLINERATE+" WHERE " + stringBuilder + " group by " + InfluxDBPublicParam.DEV_INDEX +" tz('Asia/Shanghai')"; + String sql = "SELECT (SUM(" + ONLINE_MIN + ")/(SUM(" + OFFLINE_MIN + ")+SUM(" + ONLINE_MIN + ")))*100 AS onlineRate FROM "+PQS_ONLINERATE+" WHERE " + stringBuilder + " group by " + DEV_INDEX +" tz('Asia/Shanghai')"; //结果集 QueryResult result = influxDbUtils.query(sql); //处理结果集 @@ -163,7 +164,7 @@ public class OnlineRateDataServiceImpl implements OnlineRateDataService { list.forEach(po -> { PublicDTO publicDTO = new PublicDTO(); List> valueList = po.getValues(); - String index = po.getTags().get(InfluxDBPublicParam.DEV_INDEX); + String index = po.getTags().get(DEV_INDEX); if (!CollectionUtils.isEmpty(valueList)) { for (List value : valueList) { //终端在线率 保留两位小数 diff --git a/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/TerminalServiceImpl.java b/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/TerminalServiceImpl.java index 50c6dbdc5..7629773fc 100644 --- a/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/TerminalServiceImpl.java +++ b/pqs-harmonic/harmonic-boot/src/main/java/com/njcn/harmonic/service/impl/TerminalServiceImpl.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import static com.njcn.influxdb.param.InfluxDBPublicParam.*; + /** * @author: chenchao * @date: 2022/03/02 19:25 @@ -216,7 +218,7 @@ public class TerminalServiceImpl implements TerminalService { } } //sql语句 - String sql = "SELECT (SUM(" + InfluxDBPublicParam.ONLINEMIN + ")/(SUM(" + InfluxDBPublicParam.OFFLINEMIN + ")+SUM(" + InfluxDBPublicParam.ONLINEMIN + ")))*100 AS onlineRate FROM "+InfluxDBPublicParam.PQS_ONLINERATE+" WHERE " + stringBuilder + " group by " + InfluxDBPublicParam.DEV_INDEX + InfluxDBPublicParam.TIME_ZONE; + String sql = "SELECT (SUM(" + InfluxDBPublicParam.ONLINE_MIN + ")/(SUM(" + InfluxDBPublicParam.OFFLINE_MIN + ")+SUM(" + InfluxDBPublicParam.ONLINE_MIN + ")))*100 AS onlineRate FROM "+InfluxDBPublicParam.PQS_ONLINERATE+" WHERE " + stringBuilder + " group by " + InfluxDBPublicParam.DEV_INDEX + InfluxDBPublicParam.TIME_ZONE; //结果集 QueryResult result = influxDbUtils.query(sql); //处理结果集 diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java index c5a22465f..d6c9260f7 100644 --- a/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/handler/PqsOnlineRateJob.java @@ -1,8 +1,40 @@ package com.njcn.executor.handler; +import com.njcn.common.pojo.constant.PatternRegex; +import com.njcn.device.api.LineFeignClient; +import com.njcn.energy.pojo.constant.ModelState; +import com.njcn.executor.pojo.vo.PqsCommunicate; +import com.njcn.executor.pojo.vo.PqsOnlineRate; +import com.njcn.influxdb.utils.InfluxDbUtils; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.QueryResult; +import org.influxdb.impl.InfluxDBResultMapper; +import org.influxdb.querybuilder.SelectQueryImpl; +import org.influxdb.querybuilder.WhereNested; +import org.influxdb.querybuilder.WhereQueryImpl; +import org.influxdb.querybuilder.clauses.Clause; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static com.njcn.influxdb.param.InfluxDBPublicParam.*; +import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*; /** * 类的介绍: @@ -16,14 +48,234 @@ import org.springframework.stereotype.Component; @AllArgsConstructor public class PqsOnlineRateJob { + private final InfluxDbUtils influxDbUtils; + private final LineFeignClient lineFeignClient; + private final Integer DAY_MINUTE= 60*24; + @XxlJob("pqsOnlineRateJobHandler") + public void pqsOnlineRateJobHandler() throws ParseException { + List result = new ArrayList<>(); + List paramList = new ArrayList<>(),deviceList = new ArrayList<>(); + String command = XxlJobHelper.getJobParam(); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Calendar calendar = Calendar.getInstance(); + calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH)-1, 0, 0, 0); + calendar.set(Calendar.MILLISECOND, 0); + Calendar calendar2 = Calendar.getInstance(); + calendar2.set(calendar2.get(Calendar.YEAR), calendar2.get(Calendar.MONTH), calendar2.get(Calendar.DAY_OF_MONTH)-1, 23, 59, 59); + calendar2.set(Calendar.MILLISECOND, 0); + String startTime = format.format(calendar.getTime()); + String endTime = format.format(calendar2.getTime()); + if (!StringUtils.isEmpty(command)){ + paramList = Arrays.asList(command.split(",")); + startTime = paramList.get(0); + endTime = paramList.get(1); + deviceList = paramList.subList(2,paramList.size()); + boolean s1 = Pattern.matches(PatternRegex.TIME_FORMAT,startTime); + boolean e1 = Pattern.matches(PatternRegex.TIME_FORMAT,endTime); + if (!s1 || !e1){ + log.error("补招时间格式错误"); + return; + } else { + startTime = startTime + " 00:00:00"; + endTime = endTime + " 23:59:59"; + } + } + if (CollectionUtils.isEmpty(deviceList)){ + deviceList = lineFeignClient.getDeviceList().getData(); + } + if (!CollectionUtils.isEmpty(deviceList)){ + long diff,diffDays,a,b = 0; + List l1 = new ArrayList<>(),l2 = new ArrayList<>(); + Date d1 = format.parse(startTime); + Date d2 = format.parse(endTime); + diff = d2.getTime() - d1.getTime(); + diffDays = diff / (24 * 60 * 60 * 1000-1000); + int days = (int) diffDays; + for (int i = 1; i <= days; i++) { + a = d1.getTime() + (long)(i-1)*(24 * 60 * 60) * 1000; + b = d1.getTime() + (long)i*(24 * 60 * 60) * 1000-1000; + startTime = format.format(a); + endTime = format.format(b); + //获取装置的最新的一条数据 + List latestList = getData(deviceList); + if (!CollectionUtils.isEmpty(latestList)){ + for (PqsCommunicate item : latestList) { + if (item.getTime().toEpochMilli() < a){ + l1.add(item); + } else if (a <= item.getTime().toEpochMilli() && item.getTime().toEpochMilli() < b){ + l2.add(item); + } + } + } + if (!CollectionUtils.isEmpty(l1)){ + for (PqsCommunicate item : l1) { + PqsOnlineRate onlineRate = new PqsOnlineRate(); + if (Objects.equals(item.getType(), ModelState.offline)){ + onlineRate.setOfflineMin(DAY_MINUTE); + onlineRate.setOnlineMin(0); + onlineRate.setOnlineRate(0.0); + } else { + onlineRate.setOfflineMin(0); + onlineRate.setOnlineMin(DAY_MINUTE); + onlineRate.setOnlineRate(100.0); + } + onlineRate.setTime(Instant.ofEpochMilli(a)); + onlineRate.setDevId(item.getDevId()); + result.add(onlineRate); + } + } + if (!CollectionUtils.isEmpty(l2)){ + List devList = l2.stream().map(PqsCommunicate::getDevId).collect(Collectors.toList()); + List list = getPqsCommunicate(devList,startTime,endTime); + //根据装置的id进行分组 + Map> groupMap = list.stream().collect(Collectors.groupingBy(PqsCommunicate::getDevId)); + try { + if (!CollectionUtils.isEmpty(groupMap)){ + for (String key : groupMap.keySet()) { + int offTime = 0; + int onTime = 0; + PqsOnlineRate onlineRate = new PqsOnlineRate(); + List infoList = groupMap.get(key); + if (infoList.size() > 1){ + //获取最早一条记录 + PqsCommunicate first = infoList.stream().min(Comparator.comparing(PqsCommunicate::getTime)).get(); + //将上线和下线分组 + Map> typeMap = infoList.stream().collect(Collectors.groupingBy(PqsCommunicate::getType)); + List off = typeMap.get(0); + List on = typeMap.get(1); + if (first.getType() == 0){ + if (off.size() == on.size()){ + for (int j = 0; j < off.size(); j++) { + offTime = offTime + (int) (on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } + } else { + for (int j = 0; j < off.size(); j++) { + if (j == off.size() - 1){ + offTime = offTime + (int) (format.parse(endTime).getTime() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } else { + offTime = offTime + (int) (on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } + } + } + onTime = DAY_MINUTE-offTime; + } else { + if (off.size() == on.size()){ + for (int j = 0; j < on.size(); j++) { + onTime = onTime + (int) (off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } + } else { + for (int j = 0; j < on.size(); j++) { + if (j == on.size() - 1){ + onTime = onTime + (int) (format.parse(endTime).getTime() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } else { + onTime = onTime + (int) (off.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + 1000L - on.get(j).getTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } + } + } + offTime = DAY_MINUTE-onTime; + } + } else { + LocalDateTime updateTime = LocalDateTime.ofInstant(infoList.get(0).getTime(), ZoneId.systemDefault()); + if (Objects.equals(infoList.get(0).getType(),0)) { + onTime = 0; + offTime = (int) (format.parse(endTime).getTime() + 1000L - updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } else { + offTime = 0; + onTime = (int) (format.parse(endTime).getTime() + 1000L - updateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) / (1000*60); + } + } + onlineRate.setDevId(infoList.get(0).getDevId()); + onlineRate.setOnlineMin(onTime); + onlineRate.setOfflineMin(offTime); + onlineRate.setOnlineRate(Double.parseDouble(String.format("%.2f",onTime*1.0/DAY_MINUTE*100))); + onlineRate.setTime(Instant.ofEpochMilli(a)); + result.add(onlineRate); + } + } + } catch (ParseException e) { + e.getMessage(); + } + } + } + } + insertData(result); + } + /** + * 获取pqs_communicate数据 + * @param list 装置集合 + * @param startTime 开始时间 + * @param endTime 结束时间 + * @return pqs_communicate数据 + */ + private List getPqsCommunicate(List list, String startTime, String endTime){ + SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE); + WhereQueryImpl where = selectQuery.where(); + whereAndNested(list, where); + where.and(gte(TIME, startTime)).and(lte(TIME, endTime)); + where.tz(TZ); + QueryResult queryResult = influxDbUtils.query(selectQuery.getCommand()); + InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); + return resultMapper.toPOJO(queryResult, PqsCommunicate.class); + } + /** + * 拼接装置条件 + * @param list 装置集合 + * @param whereQuery WhereQueryImpl + */ + private void whereAndNested(List list, WhereQueryImpl whereQuery) { + List clauses = new ArrayList<>(); + list.forEach(item->{ + Clause clause = eq(DEV_ID, item); + clauses.add(clause); + }); + WhereNested> andNested = whereQuery.andNested(); + for (Clause clause : clauses) { + andNested.or(clause); + } + andNested.close(); + } + /** + * 获取pqs_communicate数据最新一条数据 + * @param list 装置id集合 + * @return pqs_communicate数据 + */ + private List getData(List list){ + SelectQueryImpl selectQuery = select().from(DATABASE, PQS_COMMUNICATE); + WhereQueryImpl where = selectQuery.where(); + whereAndNested(list, where); + where.groupBy(DEV_ID).orderBy(desc()).limit(1); + where.tz(TZ); + QueryResult queryResult = influxDbUtils.query(selectQuery.getCommand()); + InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); + return resultMapper.toPOJO(queryResult, PqsCommunicate.class); + } - - - + /** + * 功能描述:插入pqs_integrity表数据 + * @author xy + * @param list 数据集合 + * @date 2022/5/12 8:55 + */ + private void insertData(List list){ + List records = new ArrayList<>(); + list.forEach(item->{ + Map tags = new HashMap<>(); + Map fields = new HashMap<>(); + tags.put(DEV_ID,item.getDevId()); + fields.put(ONLINE_MIN,item.getOnlineMin()); + fields.put(OFFLINE_MIN,item.getOfflineMin()); + fields.put(ONLINE_RATE,item.getOnlineRate()); + Point point = influxDbUtils.pointBuilder(PQS_ONLINERATE, item.getTime().toEpochMilli(), TimeUnit.MILLISECONDS, tags, fields); + BatchPoints batchPoints = BatchPoints.database(DATABASE).tag(DEV_ID, item.getDevId()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); + batchPoints.point(point); + records.add(batchPoints.lineProtocol()); + }); + influxDbUtils.batchInsert(DATABASE,"", InfluxDB.ConsistencyLevel.ALL, records); + } } diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/pojo/vo/PqsCommunicate.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/pojo/vo/PqsCommunicate.java new file mode 100644 index 000000000..dadf9aff2 --- /dev/null +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/pojo/vo/PqsCommunicate.java @@ -0,0 +1,32 @@ +package com.njcn.executor.pojo.vo; + +import lombok.Data; +import org.influxdb.annotation.Column; +import org.influxdb.annotation.Measurement; + +import java.time.Instant; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2022/7/12 9:55 + */ +@Data +@Measurement(name = "pqs_communicate") +public class PqsCommunicate { + + @Column(name = "time") + private Instant time; + + @Column(name = "dev_id") + private String devId; + + @Column(name = "description") + private String description; + + @Column(name = "type") + private Integer type; + +} diff --git a/pqs-job/job-executor/src/main/java/com/njcn/executor/pojo/vo/PqsOnlineRate.java b/pqs-job/job-executor/src/main/java/com/njcn/executor/pojo/vo/PqsOnlineRate.java new file mode 100644 index 000000000..0e1b38681 --- /dev/null +++ b/pqs-job/job-executor/src/main/java/com/njcn/executor/pojo/vo/PqsOnlineRate.java @@ -0,0 +1,35 @@ +package com.njcn.executor.pojo.vo; + +import lombok.Data; +import org.influxdb.annotation.Column; +import org.influxdb.annotation.Measurement; + +import java.time.Instant; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2022/7/12 9:55 + */ +@Data +@Measurement(name = "pqs_onlinerate") +public class PqsOnlineRate { + + @Column(name = "time") + private Instant time; + + @Column(name = "dev_id") + private String devId; + + @Column(name = "online_min") + private Integer onlineMin; + + @Column(name = "offline_min") + private Integer offlineMin; + + @Column(name = "online_rate") + private Double onlineRate; + +} diff --git a/pqs-quality/quality-boot/src/main/resources/bootstrap.yml b/pqs-quality/quality-boot/src/main/resources/bootstrap.yml index 1d01fc4aa..879619902 100644 --- a/pqs-quality/quality-boot/src/main/resources/bootstrap.yml +++ b/pqs-quality/quality-boot/src/main/resources/bootstrap.yml @@ -21,11 +21,11 @@ spring: cloud: nacos: discovery: - server-addr: 192.168.1.18:18848 -# namespace: @nacos.namespace@ + server-addr: @nacos.url@ + namespace: @nacos.namespace@ config: - server-addr: 192.168.1.18:18848 -# namespace: @nacos.namespace@ + server-addr: @nacos.url@ + namespace: @nacos.namespace@ file-extension: yaml shared-configs: - data-id: share-config.yaml @@ -38,8 +38,7 @@ spring: #项目日志的配置 logging: -# config: http://@nacos.url@/nacos/v1/cs/configs?tenant=@nacos.namespace@&group=DEFAULT_GROUP&dataId=logback.xml - config: http://192.168.1.18:18848/nacos/v1/cs/configs?group=DEFAULT_GROUP&dataId=logback.xml + config: http://@nacos.url@/nacos/v1/cs/configs?tenant=@nacos.namespace@&group=DEFAULT_GROUP&dataId=logback.xml level: root: info