4 Commits

8 changed files with 576 additions and 42 deletions

View File

@@ -54,17 +54,31 @@ public class EventGateServiceImpl implements EventGateService {
String lineid = eventDetail.getLineId(); String lineid = eventDetail.getLineId();
LedgerBaseInfo pqLine = ledgerScaleMapper.getLedgerBaseInfo(Stream.of(lineid).collect(Collectors.toList())).get(0); LedgerBaseInfo pqLine = ledgerScaleMapper.getLedgerBaseInfo(Stream.of(lineid).collect(Collectors.toList())).get(0);
String waveName = eventDetail.getWavePath(); String waveName = eventDetail.getWavePath();
String cfgPath, datPath; String cfgPath, datPath,cfgPath1,datPath1;
InputStream cfgStream,datStream;
if (StrUtil.isBlank(waveName)) { if (StrUtil.isBlank(waveName)) {
throw new BusinessException(WaveFileResponseEnum.ANALYSE_WAVE_NOT_FOUND); throw new BusinessException(WaveFileResponseEnum.ANALYSE_WAVE_NOT_FOUND);
} }
try {
cfgPath = generalInfo.getBusinessWavePath()+ File.separator+pqLine.getIp()+"/"+waveName+".CFG"; cfgPath = generalInfo.getBusinessWavePath()+ File.separator+pqLine.getIp()+"/"+waveName+".CFG";
datPath = generalInfo.getBusinessWavePath()+ File.separator+pqLine.getIp()+"/"+waveName+".DAT"; datPath = generalInfo.getBusinessWavePath()+ File.separator+pqLine.getIp()+"/"+waveName+".DAT";
cfgStream = waveFileComponent.getFileInputStreamByFilePath(cfgPath);
datStream = waveFileComponent.getFileInputStreamByFilePath(datPath);
log.info("本地磁盘波形文件路径----" + cfgPath); log.info("本地磁盘波形文件路径----" + cfgPath);
InputStream cfgStream = waveFileComponent.getFileInputStreamByFilePath(cfgPath);
InputStream datStream = waveFileComponent.getFileInputStreamByFilePath(datPath); }catch (Exception e){
cfgPath1 = generalInfo.getBusinessWavePath()+ File.separator+pqLine.getIp()+"/"+waveName+".cfg";
datPath1 = generalInfo.getBusinessWavePath()+ File.separator+pqLine.getIp()+"/"+waveName+".dat";
cfgStream = waveFileComponent.getFileInputStreamByFilePath(cfgPath1);
datStream = waveFileComponent.getFileInputStreamByFilePath(datPath1);
}
if (Objects.isNull(cfgStream) || Objects.isNull(datStream)) { if (Objects.isNull(cfgStream) || Objects.isNull(datStream)) {
throw new BusinessException(WaveFileResponseEnum.ANALYSE_WAVE_NOT_FOUND); throw new BusinessException(WaveFileResponseEnum.ANALYSE_WAVE_NOT_FOUND);
} }
waveDataDTO = waveFileComponent.getComtrade(cfgStream, datStream, 1); waveDataDTO = waveFileComponent.getComtrade(cfgStream, datStream, 1);

View File

@@ -1,6 +1,7 @@
package com.njcn.product.event.transientes.controller; package com.njcn.product.event.transientes.controller;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
@@ -71,6 +72,8 @@ public class EventGateController extends BaseController {
@Value("${SYS_TYPE_ZT}") @Value("${SYS_TYPE_ZT}")
private String sysTypeZt; private String sysTypeZt;
@Value("${business.RealTimeSMSSwitch:false}")
private boolean realTimeSMSSwitch;
private final WebSocketServer webSocketServer; private final WebSocketServer webSocketServer;
@@ -97,6 +100,17 @@ public class EventGateController extends BaseController {
private final ThreadPoolTaskExecutor smsTaskExecutor; private final ThreadPoolTaskExecutor smsTaskExecutor;
private final PqlineCache pqlineCache; private final PqlineCache pqlineCache;
private final SendMessageService messageService;
@GetMapping("/testSendMessage")
@ApiOperation("接收远程推送的暂态事件")
public HttpResult<Object> SendMessage(@RequestParam("startTime") String startTime,@RequestParam("endtTime") String endtTime) {
String methodDescribe = getMethodDescribe("SendMessage");
messageService.sendMessage(LocalDateTimeUtil.parse(startTime,DatePattern.NORM_DATETIME_PATTERN),LocalDateTimeUtil.parse(endtTime,DatePattern.NORM_DATETIME_PATTERN));
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
@OperateInfo @OperateInfo
@GetMapping("/eventMsg") @GetMapping("/eventMsg")
@ApiOperation("接收远程推送的暂态事件") @ApiOperation("接收远程推送的暂态事件")
@@ -116,16 +130,34 @@ public class EventGateController extends BaseController {
&& Float.parseFloat(jsonObject.get("eventvalue").toString()) <= msgEventConfigService.getEventValue() && Float.parseFloat(jsonObject.get("eventvalue").toString()) <= msgEventConfigService.getEventValue()
&& (Float.parseFloat(jsonObject.get("persisttime").toString()) * 1000) >= msgEventConfigService.getEventDuration()) { && (Float.parseFloat(jsonObject.get("persisttime").toString()) * 1000) >= msgEventConfigService.getEventDuration()) {
//过滤重要暂降事件 //过滤重要暂降事件
//F47过滤
jsonObject.set("persisttime",new BigDecimal(jsonObject.get("persisttime").toString()).setScale(3,RoundingMode.HALF_UP).toString()); float eventvalue = Float.parseFloat(jsonObject.get("eventvalue").toString());
float persisttime = Float.parseFloat(jsonObject.get("persisttime").toString()) * 1000;
if(shouldSendSMS(eventvalue,persisttime)){
Integer lineId = Integer.valueOf(jsonObject.get("lineid").toString()); Integer lineId = Integer.valueOf(jsonObject.get("lineid").toString());
List<PqUserLineAssPO> assList = pqUserLineAssMapper.selectList(new LambdaQueryWrapper<PqUserLineAssPO>().eq(PqUserLineAssPO::getLineIndex, lineId)); List<PqUserLineAssPO> assList = pqUserLineAssMapper.selectList(new LambdaQueryWrapper<PqUserLineAssPO>().eq(PqUserLineAssPO::getLineIndex, lineId));
String str; String str ;
if (CollUtil.isNotEmpty(assList)) { if (CollUtil.isNotEmpty(assList)) {
List<String> userIds = assList.stream().map(PqUserLineAssPO::getUserIndex).distinct().collect(Collectors.toList()); List<String> userIds = assList.stream().map(PqUserLineAssPO::getUserIndex).distinct().collect(Collectors.toList());
List<PqUserLedgerPO> poList = pqUserLedgerMapper.selectList(new LambdaQueryWrapper<PqUserLedgerPO>().select(PqUserLedgerPO::getId, PqUserLedgerPO::getCustomerName).in(PqUserLedgerPO::getId, userIds)); List<PqUserLedgerPO> poList = pqUserLedgerMapper.selectList(new LambdaQueryWrapper<PqUserLedgerPO>().select(PqUserLedgerPO::getId, PqUserLedgerPO::getCustomerName,PqUserLedgerPO::getIsShow).in(PqUserLedgerPO::getId, userIds));
str = poList.stream().map(PqUserLedgerPO::getCustomerName).collect(Collectors.joining(";")); List<PqUserLedgerPO> isShowUser = poList.stream().filter(temp -> temp.getIsShow() == 1).collect(Collectors.toList());
List<PqUserLedgerPO> noShowUser = poList.stream().filter(temp -> temp.getIsShow() != 1).collect(Collectors.toList());
String strUser="" ;
if(CollectionUtil.isEmpty(isShowUser)){
strUser =strUser+"影响集成电路敏感用户:无;";
}else {
String sensitiveUser = isShowUser.stream().map(PqUserLedgerPO::getCustomerName).collect(Collectors.joining(";"));
strUser =strUser+"影响集成电路敏感用户:"+sensitiveUser+";";
}
if(CollectionUtil.isEmpty(noShowUser)){
strUser =strUser+"影响其他用户:无;";
}else {
String sensitiveUser = noShowUser.stream().map(PqUserLedgerPO::getCustomerName).collect(Collectors.joining(";"));
strUser =strUser+"影响其他用户:"+sensitiveUser+";";
}
str = strUser;
} else { } else {
str = "/"; str = "/";
} }
@@ -142,11 +174,18 @@ public class EventGateController extends BaseController {
webSocketServer.sendMessageToAll(jsonObject.toString()); webSocketServer.sendMessageToAll(jsonObject.toString());
//针对前置推送的暂降事件进行短信发送功能 //针对前置推送的暂降事件进行短信发送功能
//开启实时短信功能默认关闭走批量
if(realTimeSMSSwitch){
smsTaskExecutor.execute(() -> { smsTaskExecutor.execute(() -> {
sendMessage(jsonObject, str); sendMessage(jsonObject, str);
}); });
} }
}
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
log.error("暂降json格式异常!{}", e.getMessage()); log.error("暂降json格式异常!{}", e.getMessage());
@@ -154,6 +193,27 @@ public class EventGateController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
} }
private boolean shouldSendSMS( Float value , Float time ) {
// 条件1: 电压降至50%以下持续时间超过20ms
if (value < 0.5 && time >= 20) {
return true;
}
// 条件2: 电压降至50%—70%持续时间超过200ms
if (value >= 50 && value < 70 && time >= 200) {
return true;
}
// 条件3: 电压降至70%—80%持续时间超过500ms
if (value >= 70 && value < 80 && time >= 500) {
return true;
}
return false;
}
public Set<String> getAllParentIdsRecursive(String deptId, Map<String, PqsDepts> deptMap, Set<String> result) { public Set<String> getAllParentIdsRecursive(String deptId, Map<String, PqsDepts> deptMap, Set<String> result) {
if (deptId == null || result.contains(deptId)) { if (deptId == null || result.contains(deptId)) {
return result; return result;
@@ -372,8 +432,9 @@ public class EventGateController extends BaseController {
StringBuilder stringBuilder = new StringBuilder(jsonObject.get("timeid").toString()); StringBuilder stringBuilder = new StringBuilder(jsonObject.get("timeid").toString());
String busName = jsonObject.containsKey("busname")? "_"+jsonObject.get("busname").toString():""; String busName = jsonObject.containsKey("busname")? "_"+jsonObject.get("busname").toString():"";
BigDecimal bigDecimal = new BigDecimal(jsonObject.get("eventvalue").toString()).multiply(new BigDecimal(100)).setScale(2, RoundingMode.HALF_UP); BigDecimal bigDecimal = new BigDecimal(jsonObject.get("eventvalue").toString()).multiply(new BigDecimal(100)).setScale(2, RoundingMode.HALF_UP);
stringBuilder.append(".").append(jsonObject.get("ms").toString()).append(",").append(jsonObject.get("bdname").toString()).append(busName).append("_").append(jsonObject.get("pointname").toString()) stringBuilder.append(".").append(jsonObject.get("ms").toString()).append(",").append(jsonObject.get("bdname").toString()).append(busName)
.append("发生电压暂降事件,事件残余电压").append(bigDecimal).append("%,持续时间:").append(jsonObject.get("persisttime").toString()).append("S;影响用户:"); // .append("_").append(jsonObject.get("pointname").toString())
.append("发生电压暂降事件,事件残余电压").append(bigDecimal).append("%,持续时间:").append(String.format("%.3f", Double.parseDouble(jsonObject.get("persisttime").toString()))).append("S;");
stringBuilder.append(objStr); stringBuilder.append(objStr);
String message; String message;

View File

@@ -0,0 +1,35 @@
package com.njcn.product.event.transientes.job;
import com.njcn.product.event.transientes.service.SendMessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* Description:
* Date: 2026/05/29 上午 10:45【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Component
@EnableScheduling
@RequiredArgsConstructor
@Slf4j
public class SendMessageJob {
private final SendMessageService messageService;
@Scheduled(cron = "${business.sendMessageCronExpression}")
public void executeJob() {
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
// 减去一个小时
LocalDateTime threeHourAgo = now.minusHours(3);
messageService.sendMessage(threeHourAgo,now);
}
}

View File

@@ -34,7 +34,7 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
protected void configure(HttpSecurity http) throws Exception { protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable() http.csrf().disable()
.authorizeRequests() .authorizeRequests()
.antMatchers("/cn_authenticate","/ws/**","/accept/testEvent","/accept/eventMsg","/accept/simpleTest","/accept/phoneSend","/accept/refreshIdCache","/largescreen/sycnUser").permitAll() // 允许访问认证接口 .antMatchers("/cn_authenticate","/ws/**","/accept/testEvent","/accept/eventMsg","/accept/simpleTest","/accept/phoneSend","/accept/refreshIdCache","/accept/testSendMessage","/largescreen/sycnUser").permitAll() // 允许访问认证接口
// .antMatchers("/**").permitAll() // 允许访问认证接口 // .antMatchers("/**").permitAll() // 允许访问认证接口
.anyRequest().authenticated() .anyRequest().authenticated()
.and() .and()

View File

@@ -0,0 +1,14 @@
package com.njcn.product.event.transientes.service;
import java.time.LocalDateTime;
/**
* Description:
* Date: 2026/05/29 上午 11:30【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface SendMessageService {
void sendMessage(LocalDateTime threeHourAgo, LocalDateTime now);
}

View File

@@ -697,13 +697,14 @@ public class LargeScreenCountServiceImpl implements LargeScreenCountService {
DateTime start = DateUtil.beginOfDay(DateUtil.parse(largeScreenCountParam.getSearchBeginTime())); DateTime start = DateUtil.beginOfDay(DateUtil.parse(largeScreenCountParam.getSearchBeginTime()));
DateTime end = DateUtil.endOfDay(DateUtil.parse(largeScreenCountParam.getSearchEndTime())); DateTime end = DateUtil.endOfDay(DateUtil.parse(largeScreenCountParam.getSearchEndTime()));
LambdaQueryWrapper<MsgEventInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>(); QueryWrapper<MsgEventInfo> queryWrapper = new QueryWrapper<>();
lambdaQueryWrapper.eq(!StringUtils.isEmpty(largeScreenCountParam.getSendResult()), MsgEventInfo::getSendResult, largeScreenCountParam.getSendResult()); queryWrapper.select("DISTINCT msg_index,user_id, user_name,send_time,msg_content,phone,send_result,is_handle ");
lambdaQueryWrapper.orderByDesc(MsgEventInfo::getSendTime).between(MsgEventInfo::getSendTime, start, end); queryWrapper.lambda().eq(!StringUtils.isEmpty(largeScreenCountParam.getSendResult()), MsgEventInfo::getSendResult, largeScreenCountParam.getSendResult());
queryWrapper.lambda().orderByDesc(MsgEventInfo::getSendTime).between(MsgEventInfo::getSendTime, start, end);
if(StrUtil.isNotBlank(largeScreenCountParam.getSearchValue())){ if(StrUtil.isNotBlank(largeScreenCountParam.getSearchValue())){
lambdaQueryWrapper.and(w->w.like(MsgEventInfo::getUserName,largeScreenCountParam.getSearchValue()).or().like(MsgEventInfo::getPhone,largeScreenCountParam.getSearchValue())); queryWrapper.lambda().and(w->w.like(MsgEventInfo::getUserName,largeScreenCountParam.getSearchValue()).or().like(MsgEventInfo::getPhone,largeScreenCountParam.getSearchValue()));
} }
return msgEventInfoService.page(new Page<>(PageFactory.getPageNum(largeScreenCountParam), PageFactory.getPageSize(largeScreenCountParam)), lambdaQueryWrapper); return msgEventInfoService.page(new Page<>(PageFactory.getPageNum(largeScreenCountParam), PageFactory.getPageSize(largeScreenCountParam)), queryWrapper);
} }
@Override @Override

View File

@@ -0,0 +1,409 @@
package com.njcn.product.event.transientes.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.product.event.config.RedisUtil;
import com.njcn.product.event.devcie.pojo.dto.LedgerBaseInfoDTO;
import com.njcn.product.event.devcie.pojo.po.PqsDeptsline;
import com.njcn.product.event.devcie.service.PqsDeptslineService;
import com.njcn.product.event.transientes.mapper.PqUserLedgerMapper;
import com.njcn.product.event.transientes.mapper.PqUserLineAssMapper;
import com.njcn.product.event.transientes.mapper.PqsEventdetailMapper;
import com.njcn.product.event.transientes.pojo.dto.SmsResponseDTO;
import com.njcn.product.event.transientes.pojo.dto.SmsSendDTO;
import com.njcn.product.event.transientes.pojo.po.*;
import com.njcn.product.event.transientes.service.*;
import com.njcn.product.event.transientes.utils.SmsUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Description:
* Date: 2026/05/29 上午 11:30【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SendMessageServiceImpl implements SendMessageService {
private final PqUserLineAssMapper pqUserLineAssMapper;
private final PqUserLedgerMapper pqUserLedgerMapper;
private final PqsEventdetailMapper pqsEventdetailMapper;
private final MsgEventInfoService msgEventInfoService;
private final static String NAME_KEY = "LineCache:";
private final RedisUtil redisUtil;
private final ThreadPoolTaskExecutor smsTaskExecutor;
private final PqsUsersetService pqsUsersetService;
private final SmsUtils smsUtils;
private final PqsDeptslineService pqsDeptslineService;
@Value("${SYS_TYPE_ZT}")
private String sysTypeZt;
private final PqsDeptsService pqsDeptsService;
private final PqsUserService pqsUserService;
@Override
public void sendMessage(LocalDateTime threeHourAgo, LocalDateTime now) {
log.info("开始扫描暂态事件时间段:"+threeHourAgo+"-"+now);
//查询重要敏感客户
List<PqUserLedgerPO> poList = pqUserLedgerMapper.selectList(new LambdaQueryWrapper<PqUserLedgerPO>().select(PqUserLedgerPO::getId, PqUserLedgerPO::getCustomerName,PqUserLedgerPO::getIsShow).eq(PqUserLedgerPO::getIsShow, 1));
List<String> userIds = poList.stream().map(PqUserLedgerPO::getId).collect(Collectors.toList());
List<PqUserLineAssPO> pqUserLineAssPOS = pqUserLineAssMapper.selectList(new LambdaQueryWrapper<PqUserLineAssPO>().in(PqUserLineAssPO::getUserIndex, userIds));
List<Integer> lineIds = pqUserLineAssPOS.stream().map(PqUserLineAssPO::getLineIndex).distinct().collect(Collectors.toList());
LambdaQueryWrapper<PqsEventdetail> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.in(PqsEventdetail::getLineid,lineIds).between(PqsEventdetail::getCreateTime,threeHourAgo,now);
List<PqsEventdetail> pqsEventdetails = pqsEventdetailMapper.selectList(lambdaQueryWrapper);
log.info("扫描到敏感客户暂态事件:"+pqsEventdetails.size()+"");
if(!CollectionUtils.isEmpty(pqsEventdetails)){
List<LedgerBaseInfoDTO> ledgerBaseInfoDTOS = (List<LedgerBaseInfoDTO>)redisUtil.getObjectByKey(NAME_KEY + StrUtil.DASHED+"LedgerBaseInfoDTO");
Map<Integer, LedgerBaseInfoDTO> LedgerBaseInfoDTOMap = ledgerBaseInfoDTOS.stream().collect(Collectors.toMap(LedgerBaseInfoDTO::getLineId, Function.identity()));
List<MsgEventInfo> msgEventInfos = msgEventInfoService.lambdaQuery().between(MsgEventInfo::getSendTime, threeHourAgo, now).eq(MsgEventInfo::getSendResult, 1).list();
List<String> successSendEventIds = msgEventInfos.stream().map(MsgEventInfo::getEventIndex).distinct().collect(Collectors.toList());
pqsEventdetails = pqsEventdetails.stream()
.filter(temp -> shouldSendSMS(temp.getEventvalue(), temp.getPersisttime()) && (!successSendEventIds.contains(temp.getEventdetailIndex())))
.collect(Collectors.toList());
log.info("扫描到敏感客户暂态事件过滤后事件:"+pqsEventdetails.size()+"");
if(!CollectionUtils.isEmpty(pqsEventdetails)){
log.info("过滤后事件数:"+pqsEventdetails.size());
List<PqsEventdetail> finalPqsEventdetails = pqsEventdetails;
poList.forEach(temp->{
List<Integer> tempLineIds = pqUserLineAssPOS.stream()
.filter(pqUserLineAssPO -> Objects.equals(pqUserLineAssPO.getUserIndex(), temp.getId()))
.map(PqUserLineAssPO::getLineIndex).collect(Collectors.toList());
if(CollectionUtils.isEmpty(tempLineIds)){
return;
}
List<PqsEventdetail> result = finalPqsEventdetails.stream().filter(pqsEventdetail -> tempLineIds.contains(pqsEventdetail.getLineid())).collect(Collectors.toList());
if(CollectionUtils.isEmpty(result)){
return;
}
//组装台账信息
List<LedgerBaseInfoDTO> tempLedger = ledgerBaseInfoDTOS.stream().filter(line -> tempLineIds.contains(line.getLineId())).collect(Collectors.toList());
Map<String, List<String>> busNameMap = tempLedger.stream().map(LedgerBaseInfoDTO::getBusBarName).collect(Collectors.groupingBy(SendMessageServiceImpl::extractVoltage));
String allBus = busNameMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(
entry -> {
return entry.getKey() + "母线" + entry.getValue().size() + "";
}
).collect(Collectors.joining(""));
Map<String, List<String>> subStaionMap = tempLedger.stream().map(LedgerBaseInfoDTO::getStationName).distinct().collect(Collectors.groupingBy(SendMessageServiceImpl::extractVoltage));
String allsubStation = subStaionMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey()) // 按电压等级排序110,220,500
.map(entry -> {
String voltage = entry.getKey(); // 例如 "110kV"
List<String> stations = entry.getValue();
// 构建组内字符串:第一个保留全名,后续的去掉电压前缀
StringBuilder sb = new StringBuilder();
for (int i = 0; i < stations.size(); i++) {
String fullName = stations.get(i);
if (i == 0) {
sb.append(fullName); // 第一个保留完整名称(含电压)
} else {
// 去掉电压前缀(例如 "110kV标厂变" -> "标厂变"
String pureName = fullName.startsWith(voltage)
? fullName.substring(voltage.length())
: fullName;
sb.append("").append(pureName);
}
}
return sb.toString();
})
.collect(Collectors.joining(","));
result.forEach(event->{
event.setBusBarName(LedgerBaseInfoDTOMap.get(event.getLineid()).getBusBarName());
event.setStationName(LedgerBaseInfoDTOMap.get(event.getLineid()).getStationName());
});
Map<String, List<PqsEventdetail>> groupFilterNull = result.stream()
.filter(e -> e.getStationName() != null)
.collect(Collectors.groupingBy(PqsEventdetail::getStationName));
//组装暂降信息
StringBuilder stringBuilder = new StringBuilder(temp.getCustomerName());
stringBuilder.append("上级直供变电站共有相关"+allBus+",");
stringBuilder.append("涉及"+allsubStation+"");
AtomicInteger index = new AtomicInteger();
String eventString = groupFilterNull.entrySet().stream().map(entry -> {
String substationnName = entry.getKey(); // 例如 "110kV智芯变"
List<PqsEventdetail> value = entry.getValue();
StringBuilder sb = new StringBuilder();
if (index.getAndIncrement() == 0) {
sb.append(LocalDateTimeUtil.format(value.get(0).getTimeid(),DatePattern.NORM_DATETIME_MINUTE_PATTERN)).append(",");
}else {
sb.append(LocalDateTimeUtil.format(value.get(0).getTimeid(),"HH:mm")).append(",");
}
if (value.size() == 1) {
BigDecimal eventvalue = new BigDecimal(value.get(0).getEventvalue()).multiply(new BigDecimal(100)).setScale(2, RoundingMode.HALF_UP);
BigDecimal persisttime = new BigDecimal(value.get(0).getPersisttime()).divide(new BigDecimal(1000)).setScale(3, RoundingMode.HALF_UP);
sb.append(substationnName).append(value.get(0).getBusBarName()).append("发生电压暂降,电压跌落至").append(eventvalue).append("%,持续时间:").append(persisttime).append("S");
} else {
// String minTime = LocalDateTimeUtil.format(value.stream()
// .map(PqsEventdetail::getTimeid)
// .min(LocalDateTime::compareTo).get(), DatePattern.NORM_DATETIME_MINUTE_PATTERN);
// String maxTime = LocalDateTimeUtil.format(value.stream()
// .map(eventdetail -> {
// return eventdetail.getTimeid().plus(Duration.ofMillis(eventdetail.getMs().longValue()));
// })
// .max(LocalDateTime::compareTo).get(), DatePattern.NORM_DATETIME_MS_FORMATTER);
// String BusNameList = value.stream().map(PqsEventdetail::getBusBarName).collect(Collectors.joining("、"));
Map<String, List<String>> BusNameMap = value.stream().map(PqsEventdetail::getBusBarName).distinct().collect(Collectors.groupingBy(SendMessageServiceImpl::extractVoltage));
String allBusName = BusNameMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey()) // 按电压等级排序110,220,500
.map(tempEntry -> {
String voltage = tempEntry.getKey(); // 例如 "110kV"
List<String> busNameList = tempEntry.getValue();
// 构建组内字符串:第一个保留全名,后续的去掉电压前缀
StringBuilder busName = new StringBuilder();
for (int i = 0; i < busNameList.size(); i++) {
String fullName = busNameList.get(i).replace("母线","");
if (i == 0) {
busName.append(fullName); // 第一个保留完整名称(含电压)
} else {
// 去掉电压前缀(例如 "10kV3B#母线" -> "3B#"
String pureName = fullName.startsWith(voltage)
? fullName.substring(voltage.length())
: fullName;
busName.append("").append(pureName.replace("母线",""));
}
}
return busName.toString();
})
.collect(Collectors.joining(","));
BigDecimal eventvalueMin = new BigDecimal(value.stream().mapToDouble(PqsEventdetail::getEventvalue).min().getAsDouble())
.multiply(new BigDecimal(100)).setScale(2, RoundingMode.HALF_UP);
BigDecimal eventvalueMax = new BigDecimal(value.stream().mapToDouble(PqsEventdetail::getEventvalue).max().getAsDouble())
.multiply(new BigDecimal(100)).setScale(2, RoundingMode.HALF_UP);
BigDecimal persisttimeMin = new BigDecimal(value.stream().mapToDouble(PqsEventdetail::getPersisttime).min().getAsDouble())
.divide(new BigDecimal(1000)).setScale(3, RoundingMode.HALF_UP);
BigDecimal persisttimeMax = new BigDecimal(value.stream().mapToDouble(PqsEventdetail::getPersisttime).max().getAsDouble())
.divide(new BigDecimal(1000)).setScale(3, RoundingMode.HALF_UP);
sb.append(substationnName).append(allBusName)
.append("母线发生电压暂降,电压跌落至").append(eventvalueMin).append("%—").append(eventvalueMax).append("%,持续时间:")
.append(persisttimeMin).append("S—").append(persisttimeMax).append("S");
}
return sb.toString();
}).collect(Collectors.joining(";"));
stringBuilder.append(eventString).append("");
List<Integer> eventLineIds = result.stream().map(PqsEventdetail::getLineid).collect(Collectors.toList());
List<String> eventdetailIndexs = result.stream().map(PqsEventdetail::getEventdetailIndex).collect(Collectors.toList());
List<PqsDeptsline> pqLineDept = pqsDeptslineService.lambdaQuery().in(PqsDeptsline::getLineIndex, eventLineIds).eq(PqsDeptsline::getSystype, sysTypeZt).list();
if(CollectionUtils.isEmpty(pqLineDept)){
return;
}
Set<String> deptIds = pqLineDept.stream().map(PqsDeptsline::getDeptsIndex).collect(Collectors.toSet());
Set<String> resultIds = getAllParentDeptIds(deptIds);
List<PqsUserSet> pqsUserSetList = pqsUsersetService.lambdaQuery().eq(PqsUserSet::getIsNotice, 1).in(PqsUserSet::getDeptsIndex, resultIds).list();
if (CollUtil.isEmpty(pqsUserSetList)) {
//当前事件未找到用户信息,判断为不需要发送短信用户
return;
}
List<PqsUser> pqsUserList = pqsUserService.lambdaQuery().select(PqsUser::getUserIndex, PqsUser::getPhone, PqsUser::getName).in(PqsUser::getUserIndex, pqsUserSetList.stream().map(PqsUserSet::getUserIndex).collect(Collectors.toList())).list();
if(!CollectionUtils.isEmpty(pqsUserList)){
//开始发送短信
smsTaskExecutor.execute(() -> {
sendMessageforUser(stringBuilder.toString(), pqsUserList,eventdetailIndexs);
});
}
});
}
}
}
private void sendMessageforUser(String content, List<PqsUser> pqsUserList, List<String> eventdetailIndexs) {
//发送短信
List<MsgEventInfo> resultList = new ArrayList<>();
List<SmsSendDTO.ItemInner> msgDTOList = new ArrayList<>();
String message;
if (content.length() > 500) {
message = content.substring(0, 490).concat(";详情请登录电压暂降监测平台查看。");
} else {
message = content.toString();
}
for (PqsUser user : pqsUserList) {
String msgId = IdUtil.simpleUUID();
SmsSendDTO.ItemInner dto = new SmsSendDTO.ItemInner();
dto.setContent(message);
dto.setTo(user.getPhone());
dto.setCustomMsgID(msgId);
msgDTOList.add(dto);
for (String eventdetailIndex : eventdetailIndexs) {
MsgEventInfo msgEventInfo = new MsgEventInfo();
msgEventInfo.setMsgIndex(msgId);
msgEventInfo.setMsgContent(message);
msgEventInfo.setPhone(user.getPhone());
msgEventInfo.setUserId(user.getUserIndex());
msgEventInfo.setUserName(user.getName());
msgEventInfo.setIsHandle(0);
msgEventInfo.setSendResult(0);
msgEventInfo.setSendTime(LocalDateTime.now());
msgEventInfo.setEventIndex(eventdetailIndex);
resultList.add(msgEventInfo);
}
}
List<SmsResponseDTO.SmsItem> result = smsUtils.sendSmSToUser(msgDTOList);
Map<String, SmsResponseDTO.SmsItem> stringSmsItemMap = result.stream().collect(Collectors.toMap(SmsResponseDTO.SmsItem::getCustomMsgID, Function.identity()));
resultList.forEach(item -> {
if (stringSmsItemMap.containsKey(item.getMsgIndex())) {
SmsResponseDTO.SmsItem smsItem = stringSmsItemMap.get(item.getMsgIndex());
item.setSendResult(Objects.equals(smsItem.getCode(), "0") ? 1 : 0);
}
});
msgEventInfoService.saveBatch(resultList);
}
public Set<String> getAllParentDeptIds(Set<String> deptIds) {
// 首次获取直接父级
List<PqsDepts> allDeptList = pqsDeptsService.lambdaQuery().list();
// 递归获取所有父级
Set<String> result = recursivelyGetParentIds(deptIds, allDeptList);
return result;
}
/**
* 递归获取所有父级ID
*
* @param currentParentIds 当前层级的父级ID集合
* @return 所有层级的父级ID集合
*/
private Set<String> recursivelyGetParentIds(Set<String> currentParentIds, List<PqsDepts> allDeptList) {
Set<String> result = new HashSet<>(currentParentIds);
Set<String> nextLevelParentIds = new HashSet<>();
List<PqsDepts> parentDeptList = allDeptList.stream().filter(it -> currentParentIds.contains(it.getDeptsIndex())).collect(Collectors.toList());
for (PqsDepts pqsDepts : parentDeptList) {
if (!pqsDepts.getParentnodeid().equals("0")) {
nextLevelParentIds.add(pqsDepts.getParentnodeid());
}
}
// 如果有更高层级的父级,继续递归
if (!nextLevelParentIds.isEmpty()) {
Set<String> deeperParentIds = recursivelyGetParentIds(nextLevelParentIds, allDeptList);
result.addAll(deeperParentIds);
}
return result;
}
private boolean shouldSendSMS( Double value , Double time ) {
// 条件1: 电压降至50%以下持续时间超过20ms
if (value < 0.5 && time >= 20) {
return true;
}
// 条件2: 电压降至50%—70%持续时间超过200ms
if (value >= 50 && value < 70 && time >= 200) {
return true;
}
// 条件3: 电压降至70%—80%持续时间超过500ms
if (value >= 70 && value < 80 && time >= 500) {
return true;
}
return false;
}
private static String extractVoltage(String name) {
int kVIndex = name.indexOf("kV");
if (kVIndex == -1) {
log.info("存在台账"+name+"不符合命名规范");
throw new IllegalArgumentException("无法解析电压等级: " + name);
}
// 返回从开头到 "kV" 结束的部分(包含 "kV"
return name.substring(0, kVIndex + 2);
}
public static void main(String[] args) {
// Stream.of(1,2,3).collect(Collectors.toList()).forEach(
// temp->{
// if(temp==2){
// return;
// }
// System.out.println(temp);
// }
// );
List<String> substations = Arrays.asList(
"110kV智芯变",
"110kV标厂变",
"110kV科创变",
"220kV开发区变",
"220kV高新变",
"500kV枢纽换流站"
);
// 按电压等级字符串含kV分组
Map<String, List<String>> grouped = substations.stream()
.collect(Collectors.groupingBy(SendMessageServiceImpl::extractVoltage));
String collect = grouped.entrySet().stream()
.sorted(Map.Entry.comparingByKey()) // 按电压等级排序110,220,500
.map(entry -> {
String voltage = entry.getKey(); // 例如 "110kV"
List<String> stations = entry.getValue();
// 构建组内字符串:第一个保留全名,后续的去掉电压前缀
StringBuilder sb = new StringBuilder();
for (int i = 0; i < stations.size(); i++) {
String fullName = stations.get(i);
if (i == 0) {
sb.append(fullName); // 第一个保留完整名称(含电压)
} else {
// 去掉电压前缀(例如 "110kV标厂变" -> "标厂变"
String pureName = fullName.startsWith(voltage)
? fullName.substring(voltage.length())
: fullName;
sb.append("").append(pureName);
}
}
return sb.toString();
})
.collect(Collectors.joining(";"));
System.out.println(collect);
}
}

View File

@@ -40,10 +40,10 @@ business:
wavePath: D://Comtrade wavePath: D://Comtrade
targetPath: /pqmonitor targetPath: /pqmonitor
exportBaseDir: D://exportComtrade exportBaseDir: D://exportComtrade
eventCronExpression: 0 0/10 * * * ? eventCronExpression: 0 0/2 * * * ?
failCronExpression: 0 5/10 * * * ? failCronExpression: 0 5/10 * * * ?
userCronExpression: 0 5 1 * * ? userCronExpression: 0 5 1 * * ?
syncinterval: 10 syncinterval: 2
failsyncinterval: 1440 failsyncinterval: 1440
#wavePath: /usr/local/comtrade #wavePath: /usr/local/comtrade
#处理临时数据 #处理临时数据