diff --git a/pqs-energy/energy-api/pom.xml b/pqs-energy/energy-api/pom.xml index 5558d940d..23cec5c9c 100644 --- a/pqs-energy/energy-api/pom.xml +++ b/pqs-energy/energy-api/pom.xml @@ -43,6 +43,18 @@ com.google.code.gson gson + + + com.aliyun + aliyun-java-sdk-core + 4.1.0 + + + com.aliyun + aliyun-java-sdk-dysmsapi + 1.1.0 + + diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EleAirStrategyFeignClient.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EleAirStrategyFeignClient.java index d9ba91688..829574514 100644 --- a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EleAirStrategyFeignClient.java +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EleAirStrategyFeignClient.java @@ -1,9 +1,7 @@ package com.njcn.energy.pojo.api; import com.njcn.common.pojo.constant.ServerInfo; -import com.njcn.common.pojo.response.HttpResult; import com.njcn.energy.pojo.api.fallback.EleAirStrategyFallbackFactory; -import com.njcn.energy.pojo.api.fallback.EleOnlineRateFallbackFactory; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -24,6 +22,6 @@ public interface EleAirStrategyFeignClient { * @date 2022/4/22 */ @GetMapping("dealAirStrategyId") - HttpResult dealAirStrategyId(@RequestParam("operation") String operation); + void dealAirStrategyId(@RequestParam("operation") String operation); } diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EnergyStatisticFeignClient.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EnergyStatisticFeignClient.java index eb458458b..e708ec83c 100644 --- a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EnergyStatisticFeignClient.java +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/EnergyStatisticFeignClient.java @@ -1,11 +1,9 @@ package com.njcn.energy.pojo.api; import com.njcn.common.pojo.constant.ServerInfo; -import com.njcn.common.pojo.response.HttpResult; import com.njcn.energy.pojo.api.fallback.EleOnlineRateFallbackFactory; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestParam; /** @@ -23,12 +21,12 @@ public interface EnergyStatisticFeignClient { * @date 2022/4/22 */ @GetMapping("electricCalJob") - HttpResult electricCalJob(); + void electricCalJob(); @GetMapping("eleOnlineRateJob") - HttpResult eleOnlineRateJobHandler(); + void eleOnlineRateJobHandler(); @GetMapping("eleIntegrityJob") - HttpResult eleIntegrityJobHandler(); + void eleIntegrityJobHandler(); } diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EleAirStrategyFallbackFactory.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EleAirStrategyFallbackFactory.java index 15a9b4bd2..916a06eb9 100644 --- a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EleAirStrategyFallbackFactory.java +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EleAirStrategyFallbackFactory.java @@ -2,7 +2,6 @@ package com.njcn.energy.pojo.api.fallback; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.common.pojo.response.HttpResult; import com.njcn.energy.pojo.api.EleAirStrategyFeignClient; import com.njcn.energy.pojo.utils.EnergyEnumUtil; import feign.hystrix.FallbackFactory; @@ -33,7 +32,7 @@ public class EleAirStrategyFallbackFactory implements FallbackFactory finalExceptionEnum = exceptionEnum; return new EleAirStrategyFeignClient() { @Override - public HttpResult dealAirStrategyId(String operation) { + public void dealAirStrategyId(String operation) { log.error("{}异常,降级处理,异常为:{}","空调控制策略数据",cause.toString()); throw new BusinessException(finalExceptionEnum); } diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EnergyStatisticFallbackFactory.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EnergyStatisticFallbackFactory.java index 7dae7aec3..205a2d002 100644 --- a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EnergyStatisticFallbackFactory.java +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/api/fallback/EnergyStatisticFallbackFactory.java @@ -2,8 +2,6 @@ package com.njcn.energy.pojo.api.fallback; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.common.pojo.response.HttpResult; -import com.njcn.energy.pojo.api.EleAirStrategyFeignClient; import com.njcn.energy.pojo.api.EnergyStatisticFeignClient; import com.njcn.energy.pojo.utils.EnergyEnumUtil; import feign.hystrix.FallbackFactory; @@ -34,19 +32,19 @@ public class EnergyStatisticFallbackFactory implements FallbackFactory finalExceptionEnum = exceptionEnum; return new EnergyStatisticFeignClient() { @Override - public HttpResult electricCalJob() { + public void electricCalJob() { log.error("{}异常,降级处理,异常为:{}","调度统计电量增量",cause.toString()); throw new BusinessException(finalExceptionEnum); } @Override - public HttpResult eleOnlineRateJobHandler() { + public void eleOnlineRateJobHandler() { log.error("{}异常,降级处理,异常为:{}","用能终端在线率数据",cause.toString()); throw new BusinessException(finalExceptionEnum); } @Override - public HttpResult eleIntegrityJobHandler() { + public void eleIntegrityJobHandler() { log.error("{}异常,降级处理,异常为:{}","用能数据完整性数据",cause.toString()); throw new BusinessException(finalExceptionEnum); } diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/constant/SmsConstant.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/constant/SmsConstant.java new file mode 100644 index 000000000..db1eeafec --- /dev/null +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/constant/SmsConstant.java @@ -0,0 +1,19 @@ +package com.njcn.energy.pojo.constant; + +public interface SmsConstant { + + String DEFAULT_CONNECT_TIME_OUT = "sun.net.client.defaultConnectTimeout"; + String DEFAULT_READ_TIME_OUT = "sun.net.client.defaultReadTimeout"; + //短信API产品名称(短信产品名固定,无需修改) + String PRODUCT = "Dysmsapi"; + //短信API产品域名(接口地址固定,无需修改) + String DOMAIN = "dysmsapi.aliyuncs.com"; + //accessKeyId + String ACCESS_KEY_ID = "LTAI4FxsR76x2dq3w9c5puUe"; + //accessKeySecret + String ACCESS_KEY_SECRET = "GxkTR8fsrvHtixTlD9UPmOGli35tZs"; + //短信所属地 + String LOCATION = "cn-hangzhou"; + //签名 + String SGIN = "灿能云"; +} diff --git a/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/po/PvPowerStrategy.java b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/po/PvPowerStrategy.java new file mode 100644 index 000000000..2092309b8 --- /dev/null +++ b/pqs-energy/energy-api/src/main/java/com/njcn/energy/pojo/po/PvPowerStrategy.java @@ -0,0 +1,20 @@ +package com.njcn.energy.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +/** + * @Author: cdf + * @CreateTime: 2025-05-12 + * @Description: 光伏异常短信策略 + */ +@Data +@TableName(value = "pv_power_strategy") +public class PvPowerStrategy { + + private String id; + + private Double alarmVal; + + private String sendPhone; +} diff --git a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/StatisticDataRunController.java b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/StatisticDataRunController.java index cb2c62088..e3279fb1c 100644 --- a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/StatisticDataRunController.java +++ b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/StatisticDataRunController.java @@ -16,7 +16,6 @@ import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.QueryResult; -import org.springframework.scheduling.annotation.Async; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -52,7 +51,6 @@ public class StatisticDataRunController { * 调度统计电量增量 */ @GetMapping("electricCalJob") - @Async("asyncExecutor") public void electricCalJob() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Calendar startTime = Calendar.getInstance(); diff --git a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/SunPowerScheduledController.java b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/SunPowerScheduledController.java new file mode 100644 index 000000000..165e4fdb4 --- /dev/null +++ b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/controller/SunPowerScheduledController.java @@ -0,0 +1,181 @@ +package com.njcn.energy.controller; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.NumberUtil; +import cn.hutool.core.util.StrUtil; +import com.aliyuncs.DefaultAcsClient; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest; +import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse; +import com.aliyuncs.http.MethodType; +import com.aliyuncs.profile.DefaultProfile; +import com.aliyuncs.profile.IClientProfile; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.njcn.energy.mapper.EleEpdMapper; +import com.njcn.energy.mapper.EleLoadSetMapper; +import com.njcn.energy.mapper.PvPowerStrategyMapper; +import com.njcn.energy.pojo.constant.SmsConstant; +import com.njcn.energy.pojo.po.EleEpdPqd; +import com.njcn.energy.pojo.po.PvPowerStrategy; +import com.njcn.influx.utils.InfluxDbUtils; +import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.enums.DicDataTypeEnum; +import com.njcn.system.pojo.po.DictData; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * @Author: cdf + * @CreateTime: 2025-05-12 + * @Description: + */ +@EnableScheduling +@Slf4j +@RequiredArgsConstructor +@RestController +@Api(tags = "光伏异常运行定时任务") +public class SunPowerScheduledController { + + private final DicDataFeignClient dicDataFeignClient; + + private final EleLoadSetMapper eleLoadSetMapper; + + private final EleEpdMapper eleEpdMapper; + + private final PvPowerStrategyMapper pvPowerStrategyMapper; + + private static final String PARK_POWER_NOTICE = "SMS_485465792"; + + private final InfluxDbUtils influxDbUtils; + + @Scheduled(cron = "0 50 8 * * ?") + @GetMapping("sunBadSendMessage") + public void sunBadSendMessage() { + try { + DateTime yesterday = DateUtil.yesterday(); + String begin = DateUtil.format(DateUtil.beginOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN); + String end = DateUtil.format(DateUtil.endOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN); + EleEpdPqd eleEpdPqd = eleEpdMapper.selectById("64592326d968dd07ffc0437befcfa026"); + if(Objects.isNull(eleEpdPqd)){ + log.error("获取净有功电能配置失败,任务终止"); + return; + } + DictData dictData = dicDataFeignClient.getDicDataByCodeAndType("Photovoltaic", DicDataTypeEnum.ELE_LOAD_TYPE.getCode()).getData(); + if(Objects.isNull(dictData)){ + log.error("获取光伏字典失败,任务终止"); + return; + } + List lineIdList = eleLoadSetMapper.getAllLineListRelation(dictData.getId()); + log.info("获取监测点信息{}",lineIdList); + if (CollectionUtil.isNotEmpty(lineIdList)) { + StringBuilder lineId = new StringBuilder(); + for (int i = 0; i < lineIdList.size(); i++) { + if (i == lineIdList.size() - 1) { + lineId.append("'").append(lineIdList.get(i)).append("'"); + } else { + lineId.append("'").append(lineIdList.get(i)).append("'").append(" or LineId="); + } + } + StringBuilder sql = new StringBuilder("select "); + sql.append("SUM(").append(eleEpdPqd.getName()).append(") as ").append(eleEpdPqd.getName()).append(" from ").append(eleEpdPqd.getClassId()).append(" where (LineId = ").append(lineId).append(") and Phase= '").append(eleEpdPqd.getPhase()) + .append("' and time>='").append(begin).append("' and time<= '").append(end).append("' and ").append(eleEpdPqd.getName()).append(" !=-1000000").append(" group by time(15m)"); + log.info("执行sql-----------{}",sql); + List> mapList = influxDbUtils.getResult(sql.toString(), ""); + + if (CollectionUtil.isNotEmpty(mapList)) { + List> temList = mapList.stream().filter(it->Objects.nonNull(it.get("WattHr"))).collect(Collectors.toList()); + log.info("返回结果-----------{}",+temList.size()); + if(temList.size()<10){ + //防止数据量太少导致的异常发送 + log.error("power_data表统计日数据量太少,判定为异常,任务终止"); + return; + } + + double max = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).max().orElse(3.14159); + double min = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).min().orElse(3.14159); + if(max != 3.14159 && min != 3.14159){ + PvPowerStrategy pvPowerStrategy = pvPowerStrategyMapper.selectOne(new LambdaQueryWrapper<>()); + if(Objects.isNull(pvPowerStrategy)){ + log.error("表pv_power_strategy_光伏发电配置查询为空,任务终止"); + return; + } + double val = NumberUtil.round(max-min,3).doubleValue(); + log.info("当天发电量:{} kwh",+val); + if(val < pvPowerStrategy.getAlarmVal()){ + log.info("净电度差值 {} 低于阈值 {}, 准备发送报警短信", val, pvPowerStrategy.getAlarmVal()); + String[] phoneArr = pvPowerStrategy.getSendPhone().split(StrUtil.COMMA); + for(String ph : phoneArr){ + sendMobileMessage(DateUtil.format(yesterday,DatePattern.NORM_DATE_PATTERN),String.valueOf(val), ph); + } + }else { + log.info("净电度差值 {} 高于阈值 {}, 无需报警", val, pvPowerStrategy.getAlarmVal()); + } + } + } + } + } catch (Exception exception) { + exception.printStackTrace(); + } + } + + /** + * 发送手机短信 + * + * @return 推送的响应 + */ + private static SendSmsResponse sendMobileMessage(String date, String value, String phone) { + SendSmsResponse sendSmsResponse = new SendSmsResponse(); + //开始执行短信发送 + try { + //设置超时时间-可自行调整 + System.setProperty(SmsConstant.DEFAULT_CONNECT_TIME_OUT, "10000"); + System.setProperty(SmsConstant.DEFAULT_READ_TIME_OUT, "10000"); + IClientProfile profile = DefaultProfile.getProfile(SmsConstant.LOCATION, SmsConstant.ACCESS_KEY_ID, SmsConstant.ACCESS_KEY_SECRET); + DefaultProfile.addEndpoint(SmsConstant.LOCATION, SmsConstant.LOCATION, SmsConstant.PRODUCT, SmsConstant.DOMAIN); + IAcsClient acsClient = new DefaultAcsClient(profile); + SendSmsRequest request = new SendSmsRequest(); + request.setMethod(MethodType.POST); + request.setPhoneNumbers(phone); + //必填:短信签名-可在短信控制台中找到 + request.setSignName(SmsConstant.SGIN); + //必填:短信模板-可在短信控制台中找到,发送国际/港澳台消息时,请使用国际/港澳台短信模版 + request.setTemplateCode(PARK_POWER_NOTICE); + String messgae = "{\"date\":\"" + date+ "\",\"value\":\"" + value + "\"}"; + request.setTemplateParam(messgae); + //请求失败这里会抛ClientException异常 + sendSmsResponse = acsClient.getAcsResponse(request); + if (sendSmsResponse.getCode() != null && sendSmsResponse.getCode().equals("OK")) { + //成功发送短信验证码后,保存进redis + log.info("给用户发送光伏发电异常短信成功,手机号为:{}", phone); + } else { + log.info("给用户发送光伏发电异常短信失败,手机号为:{},失败原因为:{}", phone, sendSmsResponse.getMessage()); + } + } catch (Exception e) { + log.error("用能光伏发电自动定时任务发送短信异常:" + e.getMessage()); + } + return sendSmsResponse; + } + + + public static void main(String[] args) { + try { + sendMobileMessage("2025-05-11","8", "18815508963"); + } catch (Exception exception) { + exception.printStackTrace(); + } + } +} + diff --git a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/mapper/PvPowerStrategyMapper.java b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/mapper/PvPowerStrategyMapper.java new file mode 100644 index 000000000..8f82145ac --- /dev/null +++ b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/mapper/PvPowerStrategyMapper.java @@ -0,0 +1,13 @@ +package com.njcn.energy.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.energy.pojo.po.PvPowerStrategy; + +/** + * pqs + * + * @author cdf + * @date 2022/4/21 + */ +public interface PvPowerStrategyMapper extends BaseMapper { +} diff --git a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/scheduled/SunPowerScheduled.java b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/scheduled/SunPowerScheduled.java new file mode 100644 index 000000000..f363831e3 --- /dev/null +++ b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/scheduled/SunPowerScheduled.java @@ -0,0 +1,188 @@ +//package com.njcn.energy.scheduled; +// +//import cn.hutool.core.collection.CollectionUtil; +//import cn.hutool.core.date.DatePattern; +//import cn.hutool.core.date.DateTime; +//import cn.hutool.core.date.DateUtil; +//import cn.hutool.core.util.StrUtil; +//import com.aliyuncs.DefaultAcsClient; +//import com.aliyuncs.IAcsClient; +//import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest; +//import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse; +//import com.aliyuncs.http.MethodType; +//import com.aliyuncs.profile.DefaultProfile; +//import com.aliyuncs.profile.IClientProfile; +//import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +//import com.njcn.common.pojo.enums.response.CommonResponseEnum; +//import com.njcn.common.pojo.exception.BusinessException; +//import com.njcn.energy.mapper.EleEpdMapper; +//import com.njcn.energy.mapper.EleLoadSetMapper; +//import com.njcn.energy.mapper.EleMdMapper; +//import com.njcn.energy.mapper.PvPowerStrategyMapper; +//import com.njcn.energy.pojo.constant.SmsConstant; +//import com.njcn.energy.pojo.po.EleEpdPqd; +//import com.njcn.energy.pojo.po.EleMd; +//import com.njcn.energy.pojo.po.PvPowerStrategy; +//import com.njcn.influx.utils.InfluxDbUtils; +//import com.njcn.system.api.DicDataFeignClient; +//import com.njcn.system.enums.DicDataTypeEnum; +//import com.njcn.system.pojo.po.DictData; +//import com.njcn.user.enums.MessageEnum; +//import io.swagger.annotations.Api; +//import lombok.RequiredArgsConstructor; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.scheduling.annotation.EnableScheduling; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.web.bind.annotation.GetMapping; +//import org.springframework.web.bind.annotation.RestController; +// +//import java.math.BigDecimal; +//import java.math.RoundingMode; +//import java.time.LocalDateTime; +//import java.util.ArrayList; +//import java.util.List; +//import java.util.Map; +//import java.util.Objects; +//import java.util.stream.Collectors; +// +///** +// * @Author: cdf +// * @CreateTime: 2025-05-09 +// * @Description: 光伏异常短信通知 +// */ +//@EnableScheduling +//@Slf4j +//@RequiredArgsConstructor +//@RestController +//@Api(tags = "定时任务") +//public class SunPowerScheduled { +// +// private final DicDataFeignClient dicDataFeignClient; +// +// private final EleLoadSetMapper eleLoadSetMapper; +// +// private final EleEpdMapper eleEpdMapper; +// +// private final PvPowerStrategyMapper pvPowerStrategyMapper; +// +// private static final String PARK_POWER_NOTICE = "SMS_485465792"; +// +// private final InfluxDbUtils influxDbUtils; +// +// @Scheduled(cron = "0 50 8 * * ?") +// //@GetMapping("cnSunBadSendMessage") +// public void cnSunBadSendMessage() { +// try { +// DateTime yesterday = DateUtil.yesterday(); +// String begin = DateUtil.format(DateUtil.beginOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN); +// String end = DateUtil.format(DateUtil.endOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN); +// EleEpdPqd eleEpdPqd = eleEpdMapper.selectById("64592326d968dd07ffc0437befcfa026"); +// if(Objects.isNull(eleEpdPqd)){ +// log.error("获取净有功电能配置失败,任务终止"); +// return; +// } +// DictData dictData = dicDataFeignClient.getDicDataByCodeAndType("Photovoltaic",DicDataTypeEnum.ELE_LOAD_TYPE.getCode()).getData(); +// if(Objects.isNull(dictData)){ +// log.error("获取光伏字典失败,任务终止"); +// return; +// } +// List lineIdList = eleLoadSetMapper.getAllLineListRelation(dictData.getId()); +// log.info("获取监测点信息{}",lineIdList); +// if (CollectionUtil.isNotEmpty(lineIdList)) { +// StringBuilder lineId = new StringBuilder(); +// for (int i = 0; i < lineIdList.size(); i++) { +// if (i == lineIdList.size() - 1) { +// lineId.append("'").append(lineIdList.get(i)).append("'"); +// } else { +// lineId.append("'").append(lineIdList.get(i)).append("'").append(" or LineId="); +// } +// } +// StringBuilder sql = new StringBuilder("select "); +// sql.append("SUM(").append(eleEpdPqd.getName()).append(") as ").append(eleEpdPqd.getName()).append(" from ").append(eleEpdPqd.getClassId()).append(" where (LineId = ").append(lineId).append(") and Phase= '").append(eleEpdPqd.getPhase()) +// .append("' and time>='").append(begin).append("' and time<= '").append(end).append("' and ").append(eleEpdPqd.getName()).append(" !=-1000000").append(" group by time(15m)"); +// log.info("执行sql-----------{}",sql); +// List> mapList = influxDbUtils.getResult(sql.toString(), ""); +// +// if (CollectionUtil.isNotEmpty(mapList)) { +// List> temList = mapList.stream().filter(it->Objects.nonNull(it.get("WattHr"))).collect(Collectors.toList()); +// log.info("返回结果-----------{}",+temList.size()); +// if(temList.size()<30){ +// //防止数据量太少导致的异常发送 +// log.error("power_data表统计日数据量太少,判定为异常,任务终止"); +// return; +// } +// +// double max = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).max().orElse(3.14159); +// double min = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).min().orElse(3.14159); +// if(max != 3.14159 && min != 3.14159){ +// PvPowerStrategy pvPowerStrategy = pvPowerStrategyMapper.selectOne(new LambdaQueryWrapper<>()); +// if(Objects.isNull(pvPowerStrategy)){ +// log.error("表pv_power_strategy_光伏发电配置查询为空,任务终止"); +// return; +// } +// double val = new BigDecimal(max-min).setScale(3, RoundingMode.HALF_UP).doubleValue(); +// log.info("当天发电量:{} kwh",+val); +// if(val < pvPowerStrategy.getAlarmVal()){ +// log.info("净电度差值 {} 低于阈值 {}, 准备发送报警短信", val, pvPowerStrategy.getAlarmVal()); +// String[] phoneArr = pvPowerStrategy.getSendPhone().split(StrUtil.COMMA); +// for(String ph : phoneArr){ +// //sendMobileMessage(DateUtil.format(yesterday,DatePattern.NORM_DATE_PATTERN),String.valueOf(val), ph); +// } +// }else { +// log.info("净电度差值 {} 高于阈值 {}, 无需报警", val, pvPowerStrategy.getAlarmVal()); +// } +// } +// } +// } +// } catch (Exception exception) { +// exception.printStackTrace(); +// } +// } +// +// /** +// * 发送手机短信 +// * +// * @return 推送的响应 +// */ +// private static SendSmsResponse sendMobileMessage(String date,String value,String phone) { +// SendSmsResponse sendSmsResponse = new SendSmsResponse(); +// //开始执行短信发送 +// try { +// //设置超时时间-可自行调整 +// System.setProperty(SmsConstant.DEFAULT_CONNECT_TIME_OUT, "10000"); +// System.setProperty(SmsConstant.DEFAULT_READ_TIME_OUT, "10000"); +// IClientProfile profile = DefaultProfile.getProfile(SmsConstant.LOCATION, SmsConstant.ACCESS_KEY_ID, SmsConstant.ACCESS_KEY_SECRET); +// DefaultProfile.addEndpoint(SmsConstant.LOCATION, SmsConstant.LOCATION, SmsConstant.PRODUCT, SmsConstant.DOMAIN); +// IAcsClient acsClient = new DefaultAcsClient(profile); +// SendSmsRequest request = new SendSmsRequest(); +// request.setMethod(MethodType.POST); +// request.setPhoneNumbers(phone); +// //必填:短信签名-可在短信控制台中找到 +// request.setSignName(SmsConstant.SGIN); +// //必填:短信模板-可在短信控制台中找到,发送国际/港澳台消息时,请使用国际/港澳台短信模版 +// request.setTemplateCode(PARK_POWER_NOTICE); +// String messgae = "{\"date\":\"" + date+ "\",\"value\":\"" + value + "\"}"; +// request.setTemplateParam(messgae); +// //请求失败这里会抛ClientException异常 +// sendSmsResponse = acsClient.getAcsResponse(request); +// if (sendSmsResponse.getCode() != null && sendSmsResponse.getCode().equals("OK")) { +// //成功发送短信验证码后,保存进redis +// log.info("给用户发送光伏发电异常短信成功,手机号为:{}", phone); +// } else { +// log.info("给用户发送光伏发电异常短信失败,手机号为:{},失败原因为:{}", phone, sendSmsResponse.getMessage()); +// } +// } catch (Exception e) { +// log.error("用能光伏发电自动定时任务发送短信异常:" + e.getMessage()); +// } +// return sendSmsResponse; +// } +// +// +// public static void main(String[] args) { +// try { +// sendMobileMessage("2025-05-11","8", "18815508963"); +// } catch (Exception exception) { +// exception.printStackTrace(); +// } +// } +//} diff --git a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/service/impl/EleAirStrategyServiceImpl.java b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/service/impl/EleAirStrategyServiceImpl.java index cc4a5818e..0ec688374 100644 --- a/pqs-energy/energy-boot/src/main/java/com/njcn/energy/service/impl/EleAirStrategyServiceImpl.java +++ b/pqs-energy/energy-boot/src/main/java/com/njcn/energy/service/impl/EleAirStrategyServiceImpl.java @@ -38,6 +38,7 @@ import com.njcn.web.factory.PageFactory; import com.njcn.web.utils.RequestUtil; //import com.xxl.job.core.biz.model.ReturnT; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.influxdb.dto.QueryResult; import org.springframework.beans.BeanUtils; @@ -48,10 +49,10 @@ import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.Objects; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** @@ -62,8 +63,13 @@ import java.util.stream.Collectors; */ @Service @RequiredArgsConstructor +@Slf4j public class EleAirStrategyServiceImpl extends ServiceImpl implements EleAirStrategyService { + // 存储设备响应状态的共享Map(线程安全) + private final Map deviceResponseMap = new ConcurrentHashMap<>(); + + private final IEleSetService iEleSetService; //private final JobFeignClient jobFeignClient; @@ -458,153 +464,160 @@ public class EleAirStrategyServiceImpl extends ServiceImpl()).get(0); if (Objects.isNull(airStrategy)) { throw new BusinessException(CommonResponseEnum.FAIL); } - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(AirSet::getId, airStrategy.getId()); List list = iEleSetService.list(lambdaQueryWrapper); + if (CollectionUtil.isNotEmpty(list)) { List lineIds = list.stream().map(AirSet::getLineId).collect(Collectors.toList()); - //如果是关闭操作 + + // 如果是关闭操作,过滤出当前开启的空调 if ("close".equals(operation)) { StringBuilder temSql = assToInfluxParam(lineIds); String sql = "select LineId,ACInStat from air_data_Real where " + temSql + " and ACInStat = 'On'"; QueryResult result = influxDbUtils.query(sql); List listSeries = result.getResults().get(0).getSeries(); if (!CollectionUtils.isEmpty(listSeries)) { - List lineIdTem = new ArrayList<>(); - listSeries.get(0).getValues().forEach(item -> { - lineIdTem.add(item.get(1).toString()); - }); - lineIds = lineIdTem; + lineIds = listSeries.get(0).getValues().stream() + .map(item -> item.get(1).toString()) + .collect(Collectors.toList()); } } - System.out.println(lineIds); + List netDevList = logicDeviceLineMapper.getNetAndDevByLineIds(lineIds); - if (CollectionUtil.isNotEmpty(netDevList)) { - for (NetAndDevByLineDTO netAndDevByLineDTO : netDevList) { + // 提交任务到线程池,按顺序执行 + executor.submit(() -> { + for (NetAndDevByLineDTO device : netDevList) { + boolean success = false; + int retryCount = 0; - Gson gson = new Gson(); - String array = "ACInStat"; - String power = ""; - if ("open".equals(operation)) { - //开启空调 - for (int i = 0; i < 4; i++) { - if (i == 0) { - power = "On"; - array = "ACInStat"; - } else if (i == 1) { - power = airStrategy.getMode(); - array = "ACInMode"; - } else if (i == 2) { - power = airStrategy.getTemperature().toString(); - array = "ACInSetTmp"; - } else { - power = airStrategy.getWind(); - array = "ACInSpeed"; + // 每个设备独立重试3次 + while (retryCount < 3 && !success) { + try { + sendAirControlCommand(device, operation, airStrategy); + success = waitForDeviceResponse(device.getDevId()); // 等待响应 + retryCount++; + } catch (Exception e) { + retryCount++; + if (retryCount >= 3) { + System.err.println("设备 " + device.getDevId() + " 控制失败,已达最大重试次数"); + } } - AirStrategyDTO airStrategyDTO = new AirStrategyDTO(); - airStrategyDTO.setMid(1); - airStrategyDTO.setDeviceId(netAndDevByLineDTO.getDevId()); - airStrategyDTO.setUserId(clientId); - airStrategyDTO.setTimestamp(Calendar.getInstance().getTimeInMillis()); - airStrategyDTO.setExpire(-1); - airStrategyDTO.setType("CMD_DEV_CTRL"); - AirStrategyDTO.Param param = new AirStrategyDTO.Param(); - param.setLineId(netAndDevByLineDTO.getLineId()); - param.setAction(array); - param.setParm(power); - airStrategyDTO.setParam(param); - String json = gson.toJson(airStrategyDTO); - System.out.println(json); - String topic = "/platform/devcmd/v1/" + netAndDevByLineDTO.getNdid(); - publisher.send(topic, json, 1, false); - - - this.mqttSendCount = 0; - this.mqttJsonMsg = json; - this.mqttSendTopic = topic; - - } - } else { - //关闭空调 - power = "Off"; - AirStrategyDTO airStrategyDTO = new AirStrategyDTO(); - airStrategyDTO.setMid(1); - airStrategyDTO.setDeviceId(netAndDevByLineDTO.getDevId()); - airStrategyDTO.setUserId(clientId); - airStrategyDTO.setTimestamp(Calendar.getInstance().getTimeInMillis()); - airStrategyDTO.setExpire(-1); - airStrategyDTO.setType("CMD_DEV_CTRL"); - AirStrategyDTO.Param param = new AirStrategyDTO.Param(); - param.setLineId(netAndDevByLineDTO.getLineId()); - param.setAction(array); - param.setParm(power); - airStrategyDTO.setParam(param); - String json = gson.toJson(airStrategyDTO); - System.out.println(json); - String topic = "/platform/devcmd/v1/" + netAndDevByLineDTO.getNdid(); - publisher.send(topic, json, 1, false); - this.mqttSendCount = 0; - this.mqttJsonMsg = json; - this.mqttSendTopic = topic; - System.out.println("发送关空调控制"+LocalDateTime.now()); + if (!success) { + + log.error("跳过设备: " + device.getDevId() + "(控制失败或无响应)"); + } } - Thread.sleep(8000); - } - if ("close".equals(operation) && airStrategy.getType() == 0) { - AirStrategy air = new AirStrategy(); - air.setId(airStrategy.getId()); - air.setStatus(2); - airStrategyMapper.updateById(air); - } + // 所有设备控制完成后更新状态 + if ("close".equals(operation) && airStrategy.getType() == 0) { + AirStrategy update = new AirStrategy(); + update.setId(airStrategy.getId()); + update.setStatus(2); + airStrategyMapper.updateById(update); + } + + + }); } - - } - } catch (Exception e) { e.printStackTrace(); + } finally { + executor.shutdown(); // 关闭线程池 + deviceResponseMap.clear(); } - } - private Integer mqttSendCount = 0; - private String mqttJsonMsg; - private String mqttSendTopic; + private void sendAirControlCommand(NetAndDevByLineDTO device, String operation, AirStrategy airStrategy) { + Gson gson = new Gson(); + String power = "open".equals(operation) ? "On" : "Off"; + String array = "ACInStat"; + + AirStrategyDTO airStrategyDTO = new AirStrategyDTO(); + airStrategyDTO.setMid(1); + airStrategyDTO.setDeviceId(device.getDevId()); + airStrategyDTO.setUserId(clientId); + airStrategyDTO.setTimestamp(Calendar.getInstance().getTimeInMillis()); + airStrategyDTO.setExpire(-1); + airStrategyDTO.setType("CMD_DEV_CTRL"); + + AirStrategyDTO.Param param = new AirStrategyDTO.Param(); + param.setLineId(device.getLineId()); + param.setAction(array); + param.setParm(power); + airStrategyDTO.setParam(param); + + String json = gson.toJson(airStrategyDTO); + String topic = "/platform/devcmd/v1/" + device.getNdid(); + log.info("开始往{}发送控制命令:{}",topic,json); + publisher.send(topic, json, 1, false); + } + + private boolean waitForDeviceResponse(String deviceId) throws InterruptedException { + long startTime = System.currentTimeMillis(); + long timeout = 15000; // 15秒超时 + + while (System.currentTimeMillis() - startTime < timeout) { + // 检查是否收到响应(可通过共享变量或消息队列实现) + if (isDeviceResponded(deviceId)) { // 需要实现此方法 + return true; + } + Thread.sleep(1000); // 每秒检查一次 + } + return false; // 超时未响应 + } + + // 示例:检查设备是否响应(需根据实际MQTT订阅逻辑实现) + private boolean isDeviceResponded(String deviceId) { + // 例如:检查某个共享Map或数据库标记 + if(deviceResponseMap.containsKey(deviceId) && deviceResponseMap.get(deviceId)){ + System.out.println("匹配到已经响应完成"+deviceId); + return true; + } + return false; + } + + + @MqttSubscribe(value = "/platform/devack/#", qos = 1) public void airOperation(String topic, MqttMessage message, @Payload String payload) { System.out.println("收到网关反馈控制"+LocalDateTime.now()); System.out.println(message.toString()); - JSONObject jsonObject = new JSONObject(message.toString()); - String str = jsonObject.getStr("userId"); + JSONObject jsonObject = new JSONObject(payload); + String userId = jsonObject.getStr("userId"); - if (str.equals(clientId)) { - if (!jsonObject.getStr("code").equals("200")) { - mqttSendCount++; - if (mqttSendCount < 3) { - System.out.println("进入错误重发++++++"); - System.out.println("错误重发详情" + mqttJsonMsg); - publisher.send(mqttSendTopic, mqttJsonMsg, 1, false); - - } + if (clientId.equals(userId)) { + String code = jsonObject.getStr("code"); + String devId = jsonObject.get("deviceId").toString(); + if ("200".equals(code)) { + markDeviceAsResponded(devId); // 标记设备已响应 + } else { + System.err.println("设备 " + devId + " 返回错误: " + payload); } } - } + // 示例:标记设备响应(需实现具体逻辑) + private void markDeviceAsResponded(String deviceId) { + // 1. 更新内存中的状态(适用于单机应用) + deviceResponseMap.put(deviceId, true); + } + + /** * 校验参数,检查是否存在相同名称的空调策略