用能系统优化

This commit is contained in:
2025-08-25 09:16:50 +08:00
parent 06ea1008ad
commit c71e346492
12 changed files with 559 additions and 122 deletions

View File

@@ -43,6 +43,18 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dysmsapi</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>

View File

@@ -1,9 +1,7 @@
package com.njcn.energy.pojo.api;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.energy.pojo.api.fallback.EleAirStrategyFallbackFactory;
import com.njcn.energy.pojo.api.fallback.EleOnlineRateFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -24,6 +22,6 @@ public interface EleAirStrategyFeignClient {
* @date 2022/4/22
*/
@GetMapping("dealAirStrategyId")
HttpResult<Boolean> dealAirStrategyId(@RequestParam("operation") String operation);
void dealAirStrategyId(@RequestParam("operation") String operation);
}

View File

@@ -1,11 +1,9 @@
package com.njcn.energy.pojo.api;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.energy.pojo.api.fallback.EleOnlineRateFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
@@ -23,12 +21,12 @@ public interface EnergyStatisticFeignClient {
* @date 2022/4/22
*/
@GetMapping("electricCalJob")
HttpResult<Boolean> electricCalJob();
void electricCalJob();
@GetMapping("eleOnlineRateJob")
HttpResult<Boolean> eleOnlineRateJobHandler();
void eleOnlineRateJobHandler();
@GetMapping("eleIntegrityJob")
HttpResult<Boolean> eleIntegrityJobHandler();
void eleIntegrityJobHandler();
}

View File

@@ -2,7 +2,6 @@ package com.njcn.energy.pojo.api.fallback;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.energy.pojo.api.EleAirStrategyFeignClient;
import com.njcn.energy.pojo.utils.EnergyEnumUtil;
import feign.hystrix.FallbackFactory;
@@ -33,7 +32,7 @@ public class EleAirStrategyFallbackFactory implements FallbackFactory<EleAirStra
Enum<?> finalExceptionEnum = exceptionEnum;
return new EleAirStrategyFeignClient() {
@Override
public HttpResult<Boolean> dealAirStrategyId(String operation) {
public void dealAirStrategyId(String operation) {
log.error("{}异常,降级处理,异常为:{}","空调控制策略数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}

View File

@@ -2,8 +2,6 @@ package com.njcn.energy.pojo.api.fallback;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.energy.pojo.api.EleAirStrategyFeignClient;
import com.njcn.energy.pojo.api.EnergyStatisticFeignClient;
import com.njcn.energy.pojo.utils.EnergyEnumUtil;
import feign.hystrix.FallbackFactory;
@@ -34,19 +32,19 @@ public class EnergyStatisticFallbackFactory implements FallbackFactory<EnergySta
Enum<?> finalExceptionEnum = exceptionEnum;
return new EnergyStatisticFeignClient() {
@Override
public HttpResult<Boolean> electricCalJob() {
public void electricCalJob() {
log.error("{}异常,降级处理,异常为:{}","调度统计电量增量",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<Boolean> eleOnlineRateJobHandler() {
public void eleOnlineRateJobHandler() {
log.error("{}异常,降级处理,异常为:{}","用能终端在线率数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
@Override
public HttpResult<Boolean> eleIntegrityJobHandler() {
public void eleIntegrityJobHandler() {
log.error("{}异常,降级处理,异常为:{}","用能数据完整性数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}

View File

@@ -0,0 +1,19 @@
package com.njcn.energy.pojo.constant;
public interface SmsConstant {
String DEFAULT_CONNECT_TIME_OUT = "sun.net.client.defaultConnectTimeout";
String DEFAULT_READ_TIME_OUT = "sun.net.client.defaultReadTimeout";
//短信API产品名称短信产品名固定无需修改
String PRODUCT = "Dysmsapi";
//短信API产品域名接口地址固定无需修改
String DOMAIN = "dysmsapi.aliyuncs.com";
//accessKeyId
String ACCESS_KEY_ID = "LTAI4FxsR76x2dq3w9c5puUe";
//accessKeySecret
String ACCESS_KEY_SECRET = "GxkTR8fsrvHtixTlD9UPmOGli35tZs";
//短信所属地
String LOCATION = "cn-hangzhou";
//签名
String SGIN = "灿能云";
}

View File

@@ -0,0 +1,20 @@
package com.njcn.energy.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/**
* @Author: cdf
* @CreateTime: 2025-05-12
* @Description: 光伏异常短信策略
*/
@Data
@TableName(value = "pv_power_strategy")
public class PvPowerStrategy {
private String id;
private Double alarmVal;
private String sendPhone;
}

View File

@@ -16,7 +16,6 @@ import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -52,7 +51,6 @@ public class StatisticDataRunController {
* 调度统计电量增量
*/
@GetMapping("electricCalJob")
@Async("asyncExecutor")
public void electricCalJob() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar startTime = Calendar.getInstance();

View File

@@ -0,0 +1,181 @@
package com.njcn.energy.controller;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
import com.aliyuncs.http.MethodType;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.energy.mapper.EleEpdMapper;
import com.njcn.energy.mapper.EleLoadSetMapper;
import com.njcn.energy.mapper.PvPowerStrategyMapper;
import com.njcn.energy.pojo.constant.SmsConstant;
import com.njcn.energy.pojo.po.EleEpdPqd;
import com.njcn.energy.pojo.po.PvPowerStrategy;
import com.njcn.influx.utils.InfluxDbUtils;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataTypeEnum;
import com.njcn.system.pojo.po.DictData;
import io.swagger.annotations.Api;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @Author: cdf
* @CreateTime: 2025-05-12
* @Description:
*/
@EnableScheduling
@Slf4j
@RequiredArgsConstructor
@RestController
@Api(tags = "光伏异常运行定时任务")
public class SunPowerScheduledController {
private final DicDataFeignClient dicDataFeignClient;
private final EleLoadSetMapper eleLoadSetMapper;
private final EleEpdMapper eleEpdMapper;
private final PvPowerStrategyMapper pvPowerStrategyMapper;
private static final String PARK_POWER_NOTICE = "SMS_485465792";
private final InfluxDbUtils influxDbUtils;
@Scheduled(cron = "0 50 8 * * ?")
@GetMapping("sunBadSendMessage")
public void sunBadSendMessage() {
try {
DateTime yesterday = DateUtil.yesterday();
String begin = DateUtil.format(DateUtil.beginOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN);
String end = DateUtil.format(DateUtil.endOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN);
EleEpdPqd eleEpdPqd = eleEpdMapper.selectById("64592326d968dd07ffc0437befcfa026");
if(Objects.isNull(eleEpdPqd)){
log.error("获取净有功电能配置失败,任务终止");
return;
}
DictData dictData = dicDataFeignClient.getDicDataByCodeAndType("Photovoltaic", DicDataTypeEnum.ELE_LOAD_TYPE.getCode()).getData();
if(Objects.isNull(dictData)){
log.error("获取光伏字典失败,任务终止");
return;
}
List<String> lineIdList = eleLoadSetMapper.getAllLineListRelation(dictData.getId());
log.info("获取监测点信息{}",lineIdList);
if (CollectionUtil.isNotEmpty(lineIdList)) {
StringBuilder lineId = new StringBuilder();
for (int i = 0; i < lineIdList.size(); i++) {
if (i == lineIdList.size() - 1) {
lineId.append("'").append(lineIdList.get(i)).append("'");
} else {
lineId.append("'").append(lineIdList.get(i)).append("'").append(" or LineId=");
}
}
StringBuilder sql = new StringBuilder("select ");
sql.append("SUM(").append(eleEpdPqd.getName()).append(") as ").append(eleEpdPqd.getName()).append(" from ").append(eleEpdPqd.getClassId()).append(" where (LineId = ").append(lineId).append(") and Phase= '").append(eleEpdPqd.getPhase())
.append("' and time>='").append(begin).append("' and time<= '").append(end).append("' and ").append(eleEpdPqd.getName()).append(" !=-1000000").append(" group by time(15m)");
log.info("执行sql-----------{}",sql);
List<Map<String, Object>> mapList = influxDbUtils.getResult(sql.toString(), "");
if (CollectionUtil.isNotEmpty(mapList)) {
List<Map<String, Object>> temList = mapList.stream().filter(it->Objects.nonNull(it.get("WattHr"))).collect(Collectors.toList());
log.info("返回结果-----------{}",+temList.size());
if(temList.size()<10){
//防止数据量太少导致的异常发送
log.error("power_data表统计日数据量太少判定为异常,任务终止");
return;
}
double max = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).max().orElse(3.14159);
double min = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).min().orElse(3.14159);
if(max != 3.14159 && min != 3.14159){
PvPowerStrategy pvPowerStrategy = pvPowerStrategyMapper.selectOne(new LambdaQueryWrapper<>());
if(Objects.isNull(pvPowerStrategy)){
log.error("表pv_power_strategy_光伏发电配置查询为空,任务终止");
return;
}
double val = NumberUtil.round(max-min,3).doubleValue();
log.info("当天发电量:{} kwh",+val);
if(val < pvPowerStrategy.getAlarmVal()){
log.info("净电度差值 {} 低于阈值 {}, 准备发送报警短信", val, pvPowerStrategy.getAlarmVal());
String[] phoneArr = pvPowerStrategy.getSendPhone().split(StrUtil.COMMA);
for(String ph : phoneArr){
sendMobileMessage(DateUtil.format(yesterday,DatePattern.NORM_DATE_PATTERN),String.valueOf(val), ph);
}
}else {
log.info("净电度差值 {} 高于阈值 {}, 无需报警", val, pvPowerStrategy.getAlarmVal());
}
}
}
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
/**
* 发送手机短信
*
* @return 推送的响应
*/
private static SendSmsResponse sendMobileMessage(String date, String value, String phone) {
SendSmsResponse sendSmsResponse = new SendSmsResponse();
//开始执行短信发送
try {
//设置超时时间-可自行调整
System.setProperty(SmsConstant.DEFAULT_CONNECT_TIME_OUT, "10000");
System.setProperty(SmsConstant.DEFAULT_READ_TIME_OUT, "10000");
IClientProfile profile = DefaultProfile.getProfile(SmsConstant.LOCATION, SmsConstant.ACCESS_KEY_ID, SmsConstant.ACCESS_KEY_SECRET);
DefaultProfile.addEndpoint(SmsConstant.LOCATION, SmsConstant.LOCATION, SmsConstant.PRODUCT, SmsConstant.DOMAIN);
IAcsClient acsClient = new DefaultAcsClient(profile);
SendSmsRequest request = new SendSmsRequest();
request.setMethod(MethodType.POST);
request.setPhoneNumbers(phone);
//必填:短信签名-可在短信控制台中找到
request.setSignName(SmsConstant.SGIN);
//必填:短信模板-可在短信控制台中找到,发送国际/港澳台消息时,请使用国际/港澳台短信模版
request.setTemplateCode(PARK_POWER_NOTICE);
String messgae = "{\"date\":\"" + date+ "\",\"value\":\"" + value + "\"}";
request.setTemplateParam(messgae);
//请求失败这里会抛ClientException异常
sendSmsResponse = acsClient.getAcsResponse(request);
if (sendSmsResponse.getCode() != null && sendSmsResponse.getCode().equals("OK")) {
//成功发送短信验证码后保存进redis
log.info("给用户发送光伏发电异常短信成功,手机号为:{}", phone);
} else {
log.info("给用户发送光伏发电异常短信失败,手机号为:{},失败原因为:{}", phone, sendSmsResponse.getMessage());
}
} catch (Exception e) {
log.error("用能光伏发电自动定时任务发送短信异常:" + e.getMessage());
}
return sendSmsResponse;
}
public static void main(String[] args) {
try {
sendMobileMessage("2025-05-11","8", "18815508963");
} catch (Exception exception) {
exception.printStackTrace();
}
}
}

View File

@@ -0,0 +1,13 @@
package com.njcn.energy.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.energy.pojo.po.PvPowerStrategy;
/**
* pqs
*
* @author cdf
* @date 2022/4/21
*/
public interface PvPowerStrategyMapper extends BaseMapper<PvPowerStrategy> {
}

View File

@@ -0,0 +1,188 @@
//package com.njcn.energy.scheduled;
//
//import cn.hutool.core.collection.CollectionUtil;
//import cn.hutool.core.date.DatePattern;
//import cn.hutool.core.date.DateTime;
//import cn.hutool.core.date.DateUtil;
//import cn.hutool.core.util.StrUtil;
//import com.aliyuncs.DefaultAcsClient;
//import com.aliyuncs.IAcsClient;
//import com.aliyuncs.dysmsapi.model.v20170525.SendSmsRequest;
//import com.aliyuncs.dysmsapi.model.v20170525.SendSmsResponse;
//import com.aliyuncs.http.MethodType;
//import com.aliyuncs.profile.DefaultProfile;
//import com.aliyuncs.profile.IClientProfile;
//import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
//import com.njcn.common.pojo.enums.response.CommonResponseEnum;
//import com.njcn.common.pojo.exception.BusinessException;
//import com.njcn.energy.mapper.EleEpdMapper;
//import com.njcn.energy.mapper.EleLoadSetMapper;
//import com.njcn.energy.mapper.EleMdMapper;
//import com.njcn.energy.mapper.PvPowerStrategyMapper;
//import com.njcn.energy.pojo.constant.SmsConstant;
//import com.njcn.energy.pojo.po.EleEpdPqd;
//import com.njcn.energy.pojo.po.EleMd;
//import com.njcn.energy.pojo.po.PvPowerStrategy;
//import com.njcn.influx.utils.InfluxDbUtils;
//import com.njcn.system.api.DicDataFeignClient;
//import com.njcn.system.enums.DicDataTypeEnum;
//import com.njcn.system.pojo.po.DictData;
//import com.njcn.user.enums.MessageEnum;
//import io.swagger.annotations.Api;
//import lombok.RequiredArgsConstructor;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.scheduling.annotation.EnableScheduling;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.web.bind.annotation.GetMapping;
//import org.springframework.web.bind.annotation.RestController;
//
//import java.math.BigDecimal;
//import java.math.RoundingMode;
//import java.time.LocalDateTime;
//import java.util.ArrayList;
//import java.util.List;
//import java.util.Map;
//import java.util.Objects;
//import java.util.stream.Collectors;
//
///**
// * @Author: cdf
// * @CreateTime: 2025-05-09
// * @Description: 光伏异常短信通知
// */
//@EnableScheduling
//@Slf4j
//@RequiredArgsConstructor
//@RestController
//@Api(tags = "定时任务")
//public class SunPowerScheduled {
//
// private final DicDataFeignClient dicDataFeignClient;
//
// private final EleLoadSetMapper eleLoadSetMapper;
//
// private final EleEpdMapper eleEpdMapper;
//
// private final PvPowerStrategyMapper pvPowerStrategyMapper;
//
// private static final String PARK_POWER_NOTICE = "SMS_485465792";
//
// private final InfluxDbUtils influxDbUtils;
//
// @Scheduled(cron = "0 50 8 * * ?")
// //@GetMapping("cnSunBadSendMessage")
// public void cnSunBadSendMessage() {
// try {
// DateTime yesterday = DateUtil.yesterday();
// String begin = DateUtil.format(DateUtil.beginOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN);
// String end = DateUtil.format(DateUtil.endOfDay(yesterday), DatePattern.NORM_DATETIME_PATTERN);
// EleEpdPqd eleEpdPqd = eleEpdMapper.selectById("64592326d968dd07ffc0437befcfa026");
// if(Objects.isNull(eleEpdPqd)){
// log.error("获取净有功电能配置失败,任务终止");
// return;
// }
// DictData dictData = dicDataFeignClient.getDicDataByCodeAndType("Photovoltaic",DicDataTypeEnum.ELE_LOAD_TYPE.getCode()).getData();
// if(Objects.isNull(dictData)){
// log.error("获取光伏字典失败,任务终止");
// return;
// }
// List<String> lineIdList = eleLoadSetMapper.getAllLineListRelation(dictData.getId());
// log.info("获取监测点信息{}",lineIdList);
// if (CollectionUtil.isNotEmpty(lineIdList)) {
// StringBuilder lineId = new StringBuilder();
// for (int i = 0; i < lineIdList.size(); i++) {
// if (i == lineIdList.size() - 1) {
// lineId.append("'").append(lineIdList.get(i)).append("'");
// } else {
// lineId.append("'").append(lineIdList.get(i)).append("'").append(" or LineId=");
// }
// }
// StringBuilder sql = new StringBuilder("select ");
// sql.append("SUM(").append(eleEpdPqd.getName()).append(") as ").append(eleEpdPqd.getName()).append(" from ").append(eleEpdPqd.getClassId()).append(" where (LineId = ").append(lineId).append(") and Phase= '").append(eleEpdPqd.getPhase())
// .append("' and time>='").append(begin).append("' and time<= '").append(end).append("' and ").append(eleEpdPqd.getName()).append(" !=-1000000").append(" group by time(15m)");
// log.info("执行sql-----------{}",sql);
// List<Map<String, Object>> mapList = influxDbUtils.getResult(sql.toString(), "");
//
// if (CollectionUtil.isNotEmpty(mapList)) {
// List<Map<String, Object>> temList = mapList.stream().filter(it->Objects.nonNull(it.get("WattHr"))).collect(Collectors.toList());
// log.info("返回结果-----------{}",+temList.size());
// if(temList.size()<30){
// //防止数据量太少导致的异常发送
// log.error("power_data表统计日数据量太少判定为异常,任务终止");
// return;
// }
//
// double max = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).max().orElse(3.14159);
// double min = temList.stream().mapToDouble(it->Double.parseDouble(it.get("WattHr").toString())).min().orElse(3.14159);
// if(max != 3.14159 && min != 3.14159){
// PvPowerStrategy pvPowerStrategy = pvPowerStrategyMapper.selectOne(new LambdaQueryWrapper<>());
// if(Objects.isNull(pvPowerStrategy)){
// log.error("表pv_power_strategy_光伏发电配置查询为空,任务终止");
// return;
// }
// double val = new BigDecimal(max-min).setScale(3, RoundingMode.HALF_UP).doubleValue();
// log.info("当天发电量:{} kwh",+val);
// if(val < pvPowerStrategy.getAlarmVal()){
// log.info("净电度差值 {} 低于阈值 {}, 准备发送报警短信", val, pvPowerStrategy.getAlarmVal());
// String[] phoneArr = pvPowerStrategy.getSendPhone().split(StrUtil.COMMA);
// for(String ph : phoneArr){
// //sendMobileMessage(DateUtil.format(yesterday,DatePattern.NORM_DATE_PATTERN),String.valueOf(val), ph);
// }
// }else {
// log.info("净电度差值 {} 高于阈值 {}, 无需报警", val, pvPowerStrategy.getAlarmVal());
// }
// }
// }
// }
// } catch (Exception exception) {
// exception.printStackTrace();
// }
// }
//
// /**
// * 发送手机短信
// *
// * @return 推送的响应
// */
// private static SendSmsResponse sendMobileMessage(String date,String value,String phone) {
// SendSmsResponse sendSmsResponse = new SendSmsResponse();
// //开始执行短信发送
// try {
// //设置超时时间-可自行调整
// System.setProperty(SmsConstant.DEFAULT_CONNECT_TIME_OUT, "10000");
// System.setProperty(SmsConstant.DEFAULT_READ_TIME_OUT, "10000");
// IClientProfile profile = DefaultProfile.getProfile(SmsConstant.LOCATION, SmsConstant.ACCESS_KEY_ID, SmsConstant.ACCESS_KEY_SECRET);
// DefaultProfile.addEndpoint(SmsConstant.LOCATION, SmsConstant.LOCATION, SmsConstant.PRODUCT, SmsConstant.DOMAIN);
// IAcsClient acsClient = new DefaultAcsClient(profile);
// SendSmsRequest request = new SendSmsRequest();
// request.setMethod(MethodType.POST);
// request.setPhoneNumbers(phone);
// //必填:短信签名-可在短信控制台中找到
// request.setSignName(SmsConstant.SGIN);
// //必填:短信模板-可在短信控制台中找到,发送国际/港澳台消息时,请使用国际/港澳台短信模版
// request.setTemplateCode(PARK_POWER_NOTICE);
// String messgae = "{\"date\":\"" + date+ "\",\"value\":\"" + value + "\"}";
// request.setTemplateParam(messgae);
// //请求失败这里会抛ClientException异常
// sendSmsResponse = acsClient.getAcsResponse(request);
// if (sendSmsResponse.getCode() != null && sendSmsResponse.getCode().equals("OK")) {
// //成功发送短信验证码后保存进redis
// log.info("给用户发送光伏发电异常短信成功,手机号为:{}", phone);
// } else {
// log.info("给用户发送光伏发电异常短信失败,手机号为:{},失败原因为:{}", phone, sendSmsResponse.getMessage());
// }
// } catch (Exception e) {
// log.error("用能光伏发电自动定时任务发送短信异常:" + e.getMessage());
// }
// return sendSmsResponse;
// }
//
//
// public static void main(String[] args) {
// try {
// sendMobileMessage("2025-05-11","8", "18815508963");
// } catch (Exception exception) {
// exception.printStackTrace();
// }
// }
//}

View File

@@ -38,6 +38,7 @@ import com.njcn.web.factory.PageFactory;
import com.njcn.web.utils.RequestUtil;
//import com.xxl.job.core.biz.model.ReturnT;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.BeanUtils;
@@ -48,10 +49,10 @@ import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
@@ -62,8 +63,13 @@ import java.util.stream.Collectors;
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class EleAirStrategyServiceImpl extends ServiceImpl<AirStrategyMapper, AirStrategy> implements EleAirStrategyService {
// 存储设备响应状态的共享Map线程安全
private final Map<String, Boolean> deviceResponseMap = new ConcurrentHashMap<>();
private final IEleSetService iEleSetService;
//private final JobFeignClient jobFeignClient;
@@ -458,153 +464,160 @@ public class EleAirStrategyServiceImpl extends ServiceImpl<AirStrategyMapper, Ai
@Override
public void dealAirStrategyId(String operation) {
ExecutorService executor = Executors.newSingleThreadExecutor(); // 顺序执行
try {
AirStrategy airStrategy = this.list(new LambdaQueryWrapper<>()).get(0);
if (Objects.isNull(airStrategy)) {
throw new BusinessException(CommonResponseEnum.FAIL);
}
LambdaQueryWrapper<AirSet> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(AirSet::getId, airStrategy.getId());
List<AirSet> list = iEleSetService.list(lambdaQueryWrapper);
if (CollectionUtil.isNotEmpty(list)) {
List<String> lineIds = list.stream().map(AirSet::getLineId).collect(Collectors.toList());
//如果是关闭操作
// 如果是关闭操作,过滤出当前开启的空调
if ("close".equals(operation)) {
StringBuilder temSql = assToInfluxParam(lineIds);
String sql = "select LineId,ACInStat from air_data_Real where " + temSql + " and ACInStat = 'On'";
QueryResult result = influxDbUtils.query(sql);
List<QueryResult.Series> listSeries = result.getResults().get(0).getSeries();
if (!CollectionUtils.isEmpty(listSeries)) {
List<String> lineIdTem = new ArrayList<>();
listSeries.get(0).getValues().forEach(item -> {
lineIdTem.add(item.get(1).toString());
});
lineIds = lineIdTem;
lineIds = listSeries.get(0).getValues().stream()
.map(item -> item.get(1).toString())
.collect(Collectors.toList());
}
}
System.out.println(lineIds);
List<NetAndDevByLineDTO> netDevList = logicDeviceLineMapper.getNetAndDevByLineIds(lineIds);
if (CollectionUtil.isNotEmpty(netDevList)) {
for (NetAndDevByLineDTO netAndDevByLineDTO : netDevList) {
// 提交任务到线程池,按顺序执行
executor.submit(() -> {
for (NetAndDevByLineDTO device : netDevList) {
boolean success = false;
int retryCount = 0;
Gson gson = new Gson();
String array = "ACInStat";
String power = "";
if ("open".equals(operation)) {
//开启空调
for (int i = 0; i < 4; i++) {
if (i == 0) {
power = "On";
array = "ACInStat";
} else if (i == 1) {
power = airStrategy.getMode();
array = "ACInMode";
} else if (i == 2) {
power = airStrategy.getTemperature().toString();
array = "ACInSetTmp";
} else {
power = airStrategy.getWind();
array = "ACInSpeed";
// 每个设备独立重试3次
while (retryCount < 3 && !success) {
try {
sendAirControlCommand(device, operation, airStrategy);
success = waitForDeviceResponse(device.getDevId()); // 等待响应
retryCount++;
} catch (Exception e) {
retryCount++;
if (retryCount >= 3) {
System.err.println("设备 " + device.getDevId() + " 控制失败,已达最大重试次数");
}
}
AirStrategyDTO airStrategyDTO = new AirStrategyDTO();
airStrategyDTO.setMid(1);
airStrategyDTO.setDeviceId(netAndDevByLineDTO.getDevId());
airStrategyDTO.setUserId(clientId);
airStrategyDTO.setTimestamp(Calendar.getInstance().getTimeInMillis());
airStrategyDTO.setExpire(-1);
airStrategyDTO.setType("CMD_DEV_CTRL");
AirStrategyDTO.Param param = new AirStrategyDTO.Param();
param.setLineId(netAndDevByLineDTO.getLineId());
param.setAction(array);
param.setParm(power);
airStrategyDTO.setParam(param);
String json = gson.toJson(airStrategyDTO);
System.out.println(json);
String topic = "/platform/devcmd/v1/" + netAndDevByLineDTO.getNdid();
publisher.send(topic, json, 1, false);
this.mqttSendCount = 0;
this.mqttJsonMsg = json;
this.mqttSendTopic = topic;
}
} else {
//关闭空调
power = "Off";
AirStrategyDTO airStrategyDTO = new AirStrategyDTO();
airStrategyDTO.setMid(1);
airStrategyDTO.setDeviceId(netAndDevByLineDTO.getDevId());
airStrategyDTO.setUserId(clientId);
airStrategyDTO.setTimestamp(Calendar.getInstance().getTimeInMillis());
airStrategyDTO.setExpire(-1);
airStrategyDTO.setType("CMD_DEV_CTRL");
AirStrategyDTO.Param param = new AirStrategyDTO.Param();
param.setLineId(netAndDevByLineDTO.getLineId());
param.setAction(array);
param.setParm(power);
airStrategyDTO.setParam(param);
String json = gson.toJson(airStrategyDTO);
System.out.println(json);
String topic = "/platform/devcmd/v1/" + netAndDevByLineDTO.getNdid();
publisher.send(topic, json, 1, false);
this.mqttSendCount = 0;
this.mqttJsonMsg = json;
this.mqttSendTopic = topic;
System.out.println("发送关空调控制"+LocalDateTime.now());
if (!success) {
log.error("跳过设备: " + device.getDevId() + "(控制失败或无响应)");
}
}
Thread.sleep(8000);
}
if ("close".equals(operation) && airStrategy.getType() == 0) {
AirStrategy air = new AirStrategy();
air.setId(airStrategy.getId());
air.setStatus(2);
airStrategyMapper.updateById(air);
}
// 所有设备控制完成后更新状态
if ("close".equals(operation) && airStrategy.getType() == 0) {
AirStrategy update = new AirStrategy();
update.setId(airStrategy.getId());
update.setStatus(2);
airStrategyMapper.updateById(update);
}
});
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown(); // 关闭线程池
deviceResponseMap.clear();
}
}
private Integer mqttSendCount = 0;
private String mqttJsonMsg;
private String mqttSendTopic;
private void sendAirControlCommand(NetAndDevByLineDTO device, String operation, AirStrategy airStrategy) {
Gson gson = new Gson();
String power = "open".equals(operation) ? "On" : "Off";
String array = "ACInStat";
AirStrategyDTO airStrategyDTO = new AirStrategyDTO();
airStrategyDTO.setMid(1);
airStrategyDTO.setDeviceId(device.getDevId());
airStrategyDTO.setUserId(clientId);
airStrategyDTO.setTimestamp(Calendar.getInstance().getTimeInMillis());
airStrategyDTO.setExpire(-1);
airStrategyDTO.setType("CMD_DEV_CTRL");
AirStrategyDTO.Param param = new AirStrategyDTO.Param();
param.setLineId(device.getLineId());
param.setAction(array);
param.setParm(power);
airStrategyDTO.setParam(param);
String json = gson.toJson(airStrategyDTO);
String topic = "/platform/devcmd/v1/" + device.getNdid();
log.info("开始往{}发送控制命令:{}",topic,json);
publisher.send(topic, json, 1, false);
}
private boolean waitForDeviceResponse(String deviceId) throws InterruptedException {
long startTime = System.currentTimeMillis();
long timeout = 15000; // 15秒超时
while (System.currentTimeMillis() - startTime < timeout) {
// 检查是否收到响应(可通过共享变量或消息队列实现)
if (isDeviceResponded(deviceId)) { // 需要实现此方法
return true;
}
Thread.sleep(1000); // 每秒检查一次
}
return false; // 超时未响应
}
// 示例检查设备是否响应需根据实际MQTT订阅逻辑实现
private boolean isDeviceResponded(String deviceId) {
// 例如检查某个共享Map或数据库标记
if(deviceResponseMap.containsKey(deviceId) && deviceResponseMap.get(deviceId)){
System.out.println("匹配到已经响应完成"+deviceId);
return true;
}
return false;
}
@MqttSubscribe(value = "/platform/devack/#", qos = 1)
public void airOperation(String topic, MqttMessage message, @Payload String payload) {
System.out.println("收到网关反馈控制"+LocalDateTime.now());
System.out.println(message.toString());
JSONObject jsonObject = new JSONObject(message.toString());
String str = jsonObject.getStr("userId");
JSONObject jsonObject = new JSONObject(payload);
String userId = jsonObject.getStr("userId");
if (str.equals(clientId)) {
if (!jsonObject.getStr("code").equals("200")) {
mqttSendCount++;
if (mqttSendCount < 3) {
System.out.println("进入错误重发++++++");
System.out.println("错误重发详情" + mqttJsonMsg);
publisher.send(mqttSendTopic, mqttJsonMsg, 1, false);
}
if (clientId.equals(userId)) {
String code = jsonObject.getStr("code");
String devId = jsonObject.get("deviceId").toString();
if ("200".equals(code)) {
markDeviceAsResponded(devId); // 标记设备已响应
} else {
System.err.println("设备 " + devId + " 返回错误: " + payload);
}
}
}
// 示例:标记设备响应(需实现具体逻辑)
private void markDeviceAsResponded(String deviceId) {
// 1. 更新内存中的状态(适用于单机应用)
deviceResponseMap.put(deviceId, true);
}
/**
* 校验参数,检查是否存在相同名称的空调策略