Compare commits
4 Commits
2026-03
...
9bb250bdb9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9bb250bdb9 | ||
|
|
d33b3637a5 | ||
|
|
f50f11b159 | ||
| 30984aa908 |
@@ -223,6 +223,10 @@ public class EventWaveAnalysisServiceImpl implements EventWaveAnalysisService {
|
|||||||
RmpEventDetailPO rmpEventDetailPO = rmpEventAdvanceMapper.selectById(eventIndex);
|
RmpEventDetailPO rmpEventDetailPO = rmpEventAdvanceMapper.selectById(eventIndex);
|
||||||
EntityAdvancedData entityAdvancedData;
|
EntityAdvancedData entityAdvancedData;
|
||||||
|
|
||||||
|
if (Objects.isNull(rmpEventDetailPO.getFileFlag())) {
|
||||||
|
throw new BusinessException("系统检测到波形文件未从装置招到本地,请联系管理员");
|
||||||
|
}
|
||||||
|
|
||||||
if (rmpEventDetailPO.getFileFlag() == 1) {
|
if (rmpEventDetailPO.getFileFlag() == 1) {
|
||||||
//获取所有暂态原因
|
//获取所有暂态原因
|
||||||
List<DictData> dicDataList = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.EVENT_TYPE.getCode()).getData();
|
List<DictData> dicDataList = dicDataFeignClient.getDicDataByTypeCode(DicDataTypeEnum.EVENT_TYPE.getCode()).getData();
|
||||||
|
|||||||
@@ -3,11 +3,9 @@ package com.njcn.event.file.component;
|
|||||||
import cn.hutool.core.date.DatePattern;
|
import cn.hutool.core.date.DatePattern;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
import cn.hutool.core.io.IoUtil;
|
import cn.hutool.core.io.IoUtil;
|
||||||
import cn.hutool.core.text.StrPool;
|
|
||||||
import cn.hutool.core.util.ArrayUtil;
|
import cn.hutool.core.util.ArrayUtil;
|
||||||
import cn.hutool.core.util.CharsetUtil;
|
import cn.hutool.core.util.CharsetUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
|
||||||
import com.njcn.common.pojo.exception.BusinessException;
|
import com.njcn.common.pojo.exception.BusinessException;
|
||||||
import com.njcn.common.utils.wave.BitConverter;
|
import com.njcn.common.utils.wave.BitConverter;
|
||||||
import com.njcn.event.file.pojo.dto.*;
|
import com.njcn.event.file.pojo.dto.*;
|
||||||
@@ -511,7 +509,6 @@ public class WaveFileComponent {
|
|||||||
//抽点后新的的采样率
|
//抽点后新的的采样率
|
||||||
List<RateDTO> newLstRate = new ArrayList<>();
|
List<RateDTO> newLstRate = new ArrayList<>();
|
||||||
for (int iRate = 0; iRate < comtradeCfgDTO.getNRates(); iRate++) {
|
for (int iRate = 0; iRate < comtradeCfgDTO.getNRates(); iRate++) {
|
||||||
// if (comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() >= 32) {
|
|
||||||
// 计算本段录波总共有多少波形
|
// 计算本段录波总共有多少波形
|
||||||
nWaveNum = comtradeCfgDTO.getLstRate().get(iRate).getNSampleNum() / comtradeCfgDTO.getLstRate().get(iRate).getNOneSample();
|
nWaveNum = comtradeCfgDTO.getLstRate().get(iRate).getNSampleNum() / comtradeCfgDTO.getLstRate().get(iRate).getNOneSample();
|
||||||
//设置总波形大小
|
//设置总波形大小
|
||||||
@@ -547,12 +544,7 @@ public class WaveFileComponent {
|
|||||||
newLstRate.get(nnInd).setNSampleNum(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() * nWaveNum);
|
newLstRate.get(nnInd).setNSampleNum(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() * nWaveNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 正常的配置中采样率
|
|
||||||
/* comtradeCfgDTO.getLstRate().get(nnInd).setNOneSample(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample());
|
|
||||||
comtradeCfgDTO.getLstRate().get(nnInd).setNSampleNum(comtradeCfgDTO.getLstRate().get(iRate).getNSampleNum());*/
|
|
||||||
|
|
||||||
nnInd++;
|
nnInd++;
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
// 偏移量,采样间隔
|
// 偏移量,采样间隔
|
||||||
long nOffSet = 0, nWaveSpan;
|
long nOffSet = 0, nWaveSpan;
|
||||||
@@ -560,7 +552,6 @@ public class WaveFileComponent {
|
|||||||
float fValue, dfValue;
|
float fValue, dfValue;
|
||||||
// 计算不同块的采样率
|
// 计算不同块的采样率
|
||||||
int nIndex = 0;
|
int nIndex = 0;
|
||||||
// 将最低采样率替换到本段录波内
|
|
||||||
// .CFG中采样率
|
// .CFG中采样率
|
||||||
RateDTO tmpRateDTO;
|
RateDTO tmpRateDTO;
|
||||||
// nBlockNum 总循环次数
|
// nBlockNum 总循环次数
|
||||||
@@ -596,26 +587,17 @@ public class WaveFileComponent {
|
|||||||
long allWaveTemp = newLstRate.get(nIndex).getNSampleNum() / newLstRate.get(nIndex).getNOneSample();
|
long allWaveTemp = newLstRate.get(nIndex).getNSampleNum() / newLstRate.get(nIndex).getNOneSample();
|
||||||
// 本段需要补多少点
|
// 本段需要补多少点
|
||||||
long allempSample = newLstRate.get(nIndex).getNOneSample();
|
long allempSample = newLstRate.get(nIndex).getNOneSample();
|
||||||
//int iStartWaveTemp = i ;// 开始补点的起点
|
|
||||||
|
int currentDataIndex = i;
|
||||||
for (int iWaveTemp = 0; iWaveTemp < allWaveTemp; iWaveTemp++) {
|
for (int iWaveTemp = 0; iWaveTemp < allWaveTemp; iWaveTemp++) {
|
||||||
for (int mTempSample = 0; mTempSample < allempSample; mTempSample++) {
|
for (int mTempSample = 0; mTempSample < allempSample; mTempSample++) {
|
||||||
//最多只有半波有效值,也就是每周波是1个或者2个点,然后去补最少16个点
|
|
||||||
if (mTempSample / nWaveSpan == 1 && mTempSample % nWaveSpan == 0) {
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
//存储局部数据集合,包含了时间,A,B,C三相
|
|
||||||
tmpWaveData = new ArrayList<>();
|
tmpWaveData = new ArrayList<>();
|
||||||
for (int j = 0; j < comtradeCfgDTO.getNAnalogNum(); j++) {
|
for (int j = 0; j < comtradeCfgDTO.getNAnalogNum(); j++) {
|
||||||
//数据只有电压ABC三相数据,不展示U0、I0等数据 YXB2020-10-09 去除相别为N相的数据
|
|
||||||
if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzPhasicName().equalsIgnoreCase("N")) {
|
if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzPhasicName().equalsIgnoreCase("N")) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
float fCoef = comtradeCfgDTO.getLstAnalogDTO().get(j).getFCoefficent();
|
float fCoef = comtradeCfgDTO.getLstAnalogDTO().get(j).getFCoefficent();
|
||||||
|
fValue = BitConverter.byte2ToUnsignedShort(datArray, currentDataIndex * nBlockSize + 2 * 4 + j * 2) * fCoef;
|
||||||
if((i * nBlockSize + 2 * 4 + j * 2) == 2437568){
|
|
||||||
System.out.println(55);
|
|
||||||
}
|
|
||||||
fValue = BitConverter.byte2ToUnsignedShort(datArray, i * nBlockSize + 2 * 4 + j * 2) * fCoef;
|
|
||||||
//WW 2019-11-14
|
//WW 2019-11-14
|
||||||
/*************************
|
/*************************
|
||||||
* 1、接口返回的默认是二次值
|
* 1、接口返回的默认是二次值
|
||||||
@@ -668,7 +650,7 @@ public class WaveFileComponent {
|
|||||||
|
|
||||||
//xValue前移量,假如是第一次时候则需要前移
|
//xValue前移量,假如是第一次时候则需要前移
|
||||||
if (!blxValue && j == 0) {
|
if (!blxValue && j == 0) {
|
||||||
xValueAll = (float) (i * 20) / tmpRateDTO.getNOneSample() - comtradeCfgDTO.getNPush();
|
xValueAll = (float) (currentDataIndex * 20) / tmpRateDTO.getNOneSample() - comtradeCfgDTO.getNPush();
|
||||||
blxValue = true;
|
blxValue = true;
|
||||||
//只增加一个xValue的值 //增加时间值
|
//只增加一个xValue的值 //增加时间值
|
||||||
tmpWaveData.add((float) (Math.round(xValueAll * 100)) / 100);
|
tmpWaveData.add((float) (Math.round(xValueAll * 100)) / 100);
|
||||||
@@ -686,7 +668,7 @@ public class WaveFileComponent {
|
|||||||
}
|
}
|
||||||
// 把每个单独的值赋予到整体里面去
|
// 把每个单独的值赋予到整体里面去
|
||||||
if (iWaveTemp < (allWaveTemp - 1)) {
|
if (iWaveTemp < (allWaveTemp - 1)) {
|
||||||
i++;
|
currentDataIndex++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -777,6 +759,304 @@ public class WaveFileComponent {
|
|||||||
return listWaveData;
|
return listWaveData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// private List<List<Float>> getComtradeDat(ComtradeCfgDTO comtradeCfgDTO, InputStream datStream, int iType) {
|
||||||
|
// //返回数据,如果仅仅做展示后期考虑换String类型,降低内存开销
|
||||||
|
// List<List<Float>> listWaveData = new ArrayList<>();
|
||||||
|
// //初始化xValue的值
|
||||||
|
// float xValueAll = 0;
|
||||||
|
// //判断是否首次登陆
|
||||||
|
// boolean blxValue = false;
|
||||||
|
// byte[] datArray;
|
||||||
|
// try {
|
||||||
|
// datArray = IoUtil.readBytes(datStream);
|
||||||
|
// if (ArrayUtil.isEmpty(datArray)) {
|
||||||
|
// throw new BusinessException(WaveFileResponseEnum.DAT_DATA_ERROR);
|
||||||
|
// }
|
||||||
|
// // 计算每个单独的数据块的大小 4个字节的序号 4个字节的时间 2个字节的值
|
||||||
|
// // 示例中的排布是 4个字节的序号 4个字节的时间 UA(2字节) UB(2字节) UC(2字节) IA(2字节) IB(2字节) IC(2字节)
|
||||||
|
// int nDigSize = (comtradeCfgDTO.getNDigitalNum() % 16) > 0 ? (comtradeCfgDTO.getNDigitalNum() / 16 + 1) * 2 : comtradeCfgDTO.getNDigitalNum() / 16 * 2;
|
||||||
|
// int nBlockSize = 2 * Integer.SIZE / 8 + comtradeCfgDTO.getNAnalogNum() * 2 + nDigSize;
|
||||||
|
// // 总长度除以每个块的大小
|
||||||
|
// int nBlockNum = (int)Math.floor(datArray.length / nBlockSize);
|
||||||
|
//
|
||||||
|
// // 获取采样率
|
||||||
|
// int finalSampleRate = getFinalWaveSample(comtradeCfgDTO.getLstRate(), iType);
|
||||||
|
// if (finalSampleRate != -1) {
|
||||||
|
// //设置最终采样率
|
||||||
|
// comtradeCfgDTO.setFinalSampleRate(finalSampleRate);
|
||||||
|
// // 计算转换后的采样率
|
||||||
|
// int nnInd = 0;
|
||||||
|
// // 抽点后总共多少点数据
|
||||||
|
// int nWaveNum;
|
||||||
|
// //抽点后新的的采样率
|
||||||
|
// List<RateDTO> newLstRate = new ArrayList<>();
|
||||||
|
// for (int iRate = 0; iRate < comtradeCfgDTO.getNRates(); iRate++) {
|
||||||
|
//// if (comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() >= 32) {
|
||||||
|
// // 计算本段录波总共有多少波形
|
||||||
|
// nWaveNum = comtradeCfgDTO.getLstRate().get(iRate).getNSampleNum() / comtradeCfgDTO.getLstRate().get(iRate).getNOneSample();
|
||||||
|
// //设置总波形大小
|
||||||
|
// comtradeCfgDTO.setNAllWaveNum(comtradeCfgDTO.getNAllWaveNum() + nWaveNum);
|
||||||
|
// // 将最低采样率替换到本段录波内
|
||||||
|
// RateDTO tmpRateDTO = new RateDTO();
|
||||||
|
// // 有效值标志,如果是有效值,那么就需要反向补点,而不是抽点
|
||||||
|
// if (comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() >= 32) {
|
||||||
|
// //YXB 2025-08-27
|
||||||
|
// tmpRateDTO.bRMSFlag = false;
|
||||||
|
// }
|
||||||
|
// //如果采样是全波有效值或者半波有效值,需要去补足周波点数 YXB 2025-08-27
|
||||||
|
// else if (comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() <= 2) {
|
||||||
|
// //YXB 2025-08-27
|
||||||
|
// tmpRateDTO.bRMSFlag = true;
|
||||||
|
// }
|
||||||
|
// newLstRate.add(tmpRateDTO);
|
||||||
|
// //iFlag =3 一定不进行抽点算法
|
||||||
|
// if (iType != 3) {
|
||||||
|
// //true 抽点算法(当前采样率跟统一采样率不一样则是抽点,否则是未抽点)
|
||||||
|
// if (!Objects.equals(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample(), comtradeCfgDTO.getFinalSampleRate())) {
|
||||||
|
// newLstRate.get(nnInd).setNOneSample(comtradeCfgDTO.getFinalSampleRate());
|
||||||
|
// // 计算本段录波按照最低采样点应该有多少录波
|
||||||
|
// newLstRate.get(nnInd).setNSampleNum(comtradeCfgDTO.getFinalSampleRate() * nWaveNum);
|
||||||
|
// } else {
|
||||||
|
// newLstRate.get(nnInd).setNOneSample(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample());
|
||||||
|
// // 计算本段录波按照最低采样点应该有多少录波
|
||||||
|
// newLstRate.get(nnInd).setNSampleNum(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() * nWaveNum);
|
||||||
|
// }
|
||||||
|
// } else {
|
||||||
|
// newLstRate.get(nnInd).setNOneSample(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample());
|
||||||
|
// // 计算本段录波按照最低采样点应该有多少录波
|
||||||
|
// newLstRate.get(nnInd).setNSampleNum(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample() * nWaveNum);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // 正常的配置中采样率
|
||||||
|
// /* comtradeCfgDTO.getLstRate().get(nnInd).setNOneSample(comtradeCfgDTO.getLstRate().get(iRate).getNOneSample());
|
||||||
|
// comtradeCfgDTO.getLstRate().get(nnInd).setNSampleNum(comtradeCfgDTO.getLstRate().get(iRate).getNSampleNum());*/
|
||||||
|
//
|
||||||
|
// nnInd++;
|
||||||
|
//// }
|
||||||
|
// }
|
||||||
|
// // 偏移量,采样间隔
|
||||||
|
// long nOffSet = 0, nWaveSpan;
|
||||||
|
// //两个点之间的时间差
|
||||||
|
// float fValue, dfValue;
|
||||||
|
// // 计算不同块的采样率
|
||||||
|
// int nIndex = 0;
|
||||||
|
// // 将最低采样率替换到本段录波内
|
||||||
|
// // .CFG中采样率
|
||||||
|
// RateDTO tmpRateDTO;
|
||||||
|
// // nBlockNum 总循环次数
|
||||||
|
// for (int i = 0; i < nBlockNum; i++) {
|
||||||
|
// tmpRateDTO = comtradeCfgDTO.getLstRate().get(nIndex);
|
||||||
|
// // 判断是否进入下一段
|
||||||
|
// if (i == tmpRateDTO.getNSampleNum() + nOffSet) {
|
||||||
|
// nOffSet += tmpRateDTO.getNSampleNum();
|
||||||
|
// nIndex++;
|
||||||
|
// if (nIndex == nnInd) {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// tmpRateDTO = comtradeCfgDTO.getLstRate().get(nIndex);
|
||||||
|
// //YXB 2025-08-27 如果是有效值,那么需要去补点,而不是抽点
|
||||||
|
// if (newLstRate.get(nIndex).bRMSFlag == true) {
|
||||||
|
// //计算本段补点采样间隔
|
||||||
|
// nWaveSpan = newLstRate.get(nIndex).getNOneSample() / tmpRateDTO.getNOneSample();
|
||||||
|
// } else {
|
||||||
|
// // 计算本段抽点采样间隔
|
||||||
|
// nWaveSpan = tmpRateDTO.getNOneSample() / newLstRate.get(nIndex).getNOneSample();
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// dfValue = (float) 20 / tmpRateDTO.getNOneSample();
|
||||||
|
// // 判断是否到了需要抽的采样点
|
||||||
|
// if (i % nWaveSpan == 0) {
|
||||||
|
// // 计算每个通道的值
|
||||||
|
// //存储局部数据集合,包含了时间,A,B,C三相
|
||||||
|
// List<Float> tmpWaveData = new ArrayList<>();
|
||||||
|
// //YXB 2025-08-27 如果是有效值,那么需要去补点,而不是抽点
|
||||||
|
// if (newLstRate.get(nIndex).bRMSFlag == true) {
|
||||||
|
// // 计算有多少个周波
|
||||||
|
// long allWaveTemp = newLstRate.get(nIndex).getNSampleNum() / newLstRate.get(nIndex).getNOneSample();
|
||||||
|
// // 本段需要补多少点
|
||||||
|
// long allempSample = newLstRate.get(nIndex).getNOneSample();
|
||||||
|
// //int iStartWaveTemp = i ;// 开始补点的起点
|
||||||
|
// for (int iWaveTemp = 0; iWaveTemp < allWaveTemp; iWaveTemp++) {
|
||||||
|
// for (int mTempSample = 0; mTempSample < allempSample; mTempSample++) {
|
||||||
|
// //最多只有半波有效值,也就是每周波是1个或者2个点,然后去补最少16个点
|
||||||
|
// if (mTempSample / nWaveSpan == 1 && mTempSample % nWaveSpan == 0) {
|
||||||
|
// i++;
|
||||||
|
// }
|
||||||
|
// //存储局部数据集合,包含了时间,A,B,C三相
|
||||||
|
// tmpWaveData = new ArrayList<>();
|
||||||
|
// for (int j = 0; j < comtradeCfgDTO.getNAnalogNum(); j++) {
|
||||||
|
// //数据只有电压ABC三相数据,不展示U0、I0等数据 YXB2020-10-09 去除相别为N相的数据
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzPhasicName().equalsIgnoreCase("N")) {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
// float fCoef = comtradeCfgDTO.getLstAnalogDTO().get(j).getFCoefficent();
|
||||||
|
//
|
||||||
|
// if((i * nBlockSize + 2 * 4 + j * 2) == 2437568){
|
||||||
|
// System.out.println(55);
|
||||||
|
// }
|
||||||
|
// fValue = BitConverter.byte2ToUnsignedShort(datArray, i * nBlockSize + 2 * 4 + j * 2) * fCoef;
|
||||||
|
// //WW 2019-11-14
|
||||||
|
// /*************************
|
||||||
|
// * 1、接口返回的默认是二次值
|
||||||
|
// * 2、P是一次值 S是二次值
|
||||||
|
// * 3、S(二次值)情况下:
|
||||||
|
// * ①、单位为"V"时候则直接等于;
|
||||||
|
// * ②、单位为"kV"时候需要乘以1000
|
||||||
|
// * 4、P(一次值)情况下:
|
||||||
|
// * ①、单位为"V"时候则直接等于;
|
||||||
|
// * ②、单位为"kV"时候需要乘以1000
|
||||||
|
// *************************/
|
||||||
|
// //P是一次值 S是二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzValueType().equalsIgnoreCase("S")) {
|
||||||
|
// //判断单位是V还是kV
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzUnitName().equalsIgnoreCase("KV")) {
|
||||||
|
// fValue = fValue * 1000.0f;
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// //P是一次值 S是二次值
|
||||||
|
// else if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzValueType().equalsIgnoreCase("P")) {
|
||||||
|
// //判断单位是V还是kV
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzUnitName().equalsIgnoreCase("V")) {
|
||||||
|
// //根据cfg内的变比,将一次值转换成二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary() != 0.0f) {
|
||||||
|
// fValue = fValue * comtradeCfgDTO.getLstAnalogDTO().get(j).getFSecondary() / comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary();
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// //判断单位是V还是kV
|
||||||
|
// else if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzUnitName().equalsIgnoreCase("KV")) {
|
||||||
|
// //根据cfg内的变比,将一次值转换成二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary() != 0.0f) {
|
||||||
|
// fValue = fValue * 1000.0f * comtradeCfgDTO.getLstAnalogDTO().get(j).getFSecondary() / comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary();
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// } else //还有可能是 电流,单位是A
|
||||||
|
// {
|
||||||
|
// //根据cfg内的变比,将一次值转换成二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary() != 0.0f) {
|
||||||
|
// fValue = comtradeCfgDTO.getLstAnalogDTO().get(j).getFSecondary() / comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary();
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// //xValue前移量,假如是第一次时候则需要前移
|
||||||
|
// if (!blxValue && j == 0) {
|
||||||
|
// xValueAll = (float) (i * 20) / tmpRateDTO.getNOneSample() - comtradeCfgDTO.getNPush();
|
||||||
|
// blxValue = true;
|
||||||
|
// //只增加一个xValue的值 //增加时间值
|
||||||
|
// tmpWaveData.add((float) (Math.round(xValueAll * 100)) / 100);
|
||||||
|
// } else if (j == 0) {
|
||||||
|
// xValueAll += (float) dfValue / nWaveSpan;
|
||||||
|
// //只增加一个xValue的值 //增加时间值
|
||||||
|
// tmpWaveData.add((float) (Math.round(xValueAll * 100)) / 100);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// //不同通道yValue的值都需要增加,最终成ABC三相 //每个通道的值
|
||||||
|
// tmpWaveData.add((float) (Math.round(fValue * 100)) / 100);
|
||||||
|
// }
|
||||||
|
// //把每个单独的值赋予到整体里面去
|
||||||
|
// listWaveData.add(tmpWaveData);
|
||||||
|
// }
|
||||||
|
// // 把每个单独的值赋予到整体里面去
|
||||||
|
// if (iWaveTemp < (allWaveTemp - 1)) {
|
||||||
|
// i++;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// } else {
|
||||||
|
// for (int j = 0; j < comtradeCfgDTO.getNAnalogNum(); j++) {
|
||||||
|
// //数据只有电压ABC三相数据,不展示U0、I0等数据 YXB2020-10-09 去除相别为N相的数据
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzPhasicName().equalsIgnoreCase("N")) {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// float fCoef = comtradeCfgDTO.getLstAnalogDTO().get(j).getFCoefficent();
|
||||||
|
// fValue = BitConverter.byte2ToUnsignedShort(datArray, i * nBlockSize + 2 * 4 + j * 2) * fCoef;
|
||||||
|
//
|
||||||
|
// //WW 2019-11-14
|
||||||
|
// /**************************
|
||||||
|
// * 1、接口返回的默认是二次值
|
||||||
|
// * 2、P是一次值 S是二次值
|
||||||
|
// * 3、S(二次值)情况下:
|
||||||
|
// * ①、单位为"V"时候则直接等于;
|
||||||
|
// * ②、单位为"kV"时候需要乘以1000
|
||||||
|
// * 4、P(一次值)情况下:
|
||||||
|
// * ①、单位为"V"时候则直接等于;
|
||||||
|
// * ②、单位为"kV"时候需要乘以1000
|
||||||
|
// **************************/
|
||||||
|
// //P是一次值 S是二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzValueType().equalsIgnoreCase("S")) {
|
||||||
|
// //判断单位是V还是kV
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzUnitName().equalsIgnoreCase("KV")) {
|
||||||
|
// fValue = fValue * 1000.0f;
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// //P是一次值 S是二次值
|
||||||
|
// else if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzValueType().equalsIgnoreCase("P")) {
|
||||||
|
// //判断单位是V还是kV
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzUnitName().equalsIgnoreCase("V")) {
|
||||||
|
// //根据cfg内的变比,将一次值转换成二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary() != 0.0f) {
|
||||||
|
// fValue = fValue * comtradeCfgDTO.getLstAnalogDTO().get(j).getFSecondary() / comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary();
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// //判断单位是V还是kV
|
||||||
|
// else if (comtradeCfgDTO.getLstAnalogDTO().get(j).getSzUnitName().equalsIgnoreCase("KV")) {
|
||||||
|
// //根据cfg内的变比,将一次值转换成二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary() != 0.0f) {
|
||||||
|
// fValue = fValue * 1000.0f * comtradeCfgDTO.getLstAnalogDTO().get(j).getFSecondary() / comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary();
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// } else //还有可能是 电流,单位是A
|
||||||
|
// {
|
||||||
|
// //根据cfg内的变比,将一次值转换成二次值
|
||||||
|
// if (comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary() != 0.0f) {
|
||||||
|
// fValue = comtradeCfgDTO.getLstAnalogDTO().get(j).getFSecondary() / comtradeCfgDTO.getLstAnalogDTO().get(j).getFPrimary();
|
||||||
|
// } else {
|
||||||
|
// fValue = fValue;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// //xValue前移量,假如是第一次时候则需要前移
|
||||||
|
// if (!blxValue && j == 0) {
|
||||||
|
// xValueAll = (float) (i * 20) / tmpRateDTO.getNOneSample() - comtradeCfgDTO.getNPush();
|
||||||
|
// blxValue = true;
|
||||||
|
// //只增加一个xValue的值 //增加时间值
|
||||||
|
// tmpWaveData.add((float) (Math.round(xValueAll * 100)) / 100);
|
||||||
|
// } else if (j == 0) {
|
||||||
|
// xValueAll += (float) nWaveSpan * dfValue;
|
||||||
|
// //只增加一个xValue的值 //增加时间值
|
||||||
|
// tmpWaveData.add((float) (Math.round(xValueAll * 100)) / 100);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// //不同通道yValue的值都需要增加,最终成ABC三相 //每个通道的值
|
||||||
|
// tmpWaveData.add((float) (Math.round(fValue * 100)) / 100);
|
||||||
|
// }
|
||||||
|
// //把每个单独的值赋予到整体里面去
|
||||||
|
// listWaveData.add(tmpWaveData);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// } catch (Exception e) {
|
||||||
|
// e.printStackTrace();
|
||||||
|
// throw new BusinessException(WaveFileResponseEnum.DAT_DATA_ERROR);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return listWaveData;
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
/*********************************
|
/*********************************
|
||||||
* 获取最小(最终)采样率方法
|
* 获取最小(最终)采样率方法
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import com.njcn.device.node.mapper.NodeMapper;
|
|||||||
import com.njcn.device.pq.pojo.po.Device;
|
import com.njcn.device.pq.pojo.po.Device;
|
||||||
import com.njcn.device.pq.pojo.po.DeviceProcess;
|
import com.njcn.device.pq.pojo.po.DeviceProcess;
|
||||||
import com.njcn.device.pq.pojo.po.Node;
|
import com.njcn.device.pq.pojo.po.Node;
|
||||||
|
import com.njcn.message.constant.RedisKeyPrefix;
|
||||||
import com.njcn.redis.utils.RedisUtil;
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
@@ -21,6 +22,7 @@ import org.springframework.util.CollectionUtils;
|
|||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -63,8 +65,17 @@ public class DeviceComflagTasks {
|
|||||||
pqsCommunicateDto.setTime(LocalDateTimeUtil.now().format(DatePattern.NORM_DATETIME_FORMATTER));
|
pqsCommunicateDto.setTime(LocalDateTimeUtil.now().format(DatePattern.NORM_DATETIME_FORMATTER));
|
||||||
pqsCommunicateDto.setDevId(temp);
|
pqsCommunicateDto.setDevId(temp);
|
||||||
pqsCommunicateDto.setType(0);
|
pqsCommunicateDto.setType(0);
|
||||||
|
//获取之前设备状态
|
||||||
|
String devFalg =redisUtil.getStringByKey(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(temp));
|
||||||
|
|
||||||
|
if(StringUtils.isBlank(devFalg)||(!Objects.equals(Integer.valueOf(devFalg),pqsCommunicateDto.getType()))){
|
||||||
|
//状态翻转
|
||||||
|
redisUtil.saveByKey(RedisKeyPrefix.DEVICE_RUN_FLAG.concat(temp),pqsCommunicateDto.getType()+"");
|
||||||
|
pqsCommunicateFeignClient.insertion(pqsCommunicateDto) ;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
pqsCommunicateFeignClient.insertion(pqsCommunicateDto) ;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -143,7 +143,7 @@ public class AreaInfoServiceImpl implements AreaInfoService {
|
|||||||
)
|
)
|
||||||
.ge(StringUtils.isNotBlank(deviceInfoParam.getSearchBeginTime()), RmpEventDetailPO::getStartTime, DateUtil.beginOfDay(DateUtil.parse(deviceInfoParam.getSearchBeginTime())))
|
.ge(StringUtils.isNotBlank(deviceInfoParam.getSearchBeginTime()), RmpEventDetailPO::getStartTime, DateUtil.beginOfDay(DateUtil.parse(deviceInfoParam.getSearchBeginTime())))
|
||||||
.le(StringUtils.isNotBlank(deviceInfoParam.getSearchEndTime()), RmpEventDetailPO::getStartTime, DateUtil.endOfDay(DateUtil.parse(deviceInfoParam.getSearchEndTime())))
|
.le(StringUtils.isNotBlank(deviceInfoParam.getSearchEndTime()), RmpEventDetailPO::getStartTime, DateUtil.endOfDay(DateUtil.parse(deviceInfoParam.getSearchEndTime())))
|
||||||
.orderByDesc(RmpEventDetailPO::getStartTime));
|
.orderByDesc(RmpEventDetailPO::getStartTime).last("limit 100"));
|
||||||
EventDetailNew eventDetailNew;
|
EventDetailNew eventDetailNew;
|
||||||
for (RmpEventDetailPO eventDetail : eventDetails) {
|
for (RmpEventDetailPO eventDetail : eventDetails) {
|
||||||
eventDetailNew = BeanUtil.copyProperties(eventDetail, EventDetailNew.class);
|
eventDetailNew = BeanUtil.copyProperties(eventDetail, EventDetailNew.class);
|
||||||
|
|||||||
@@ -36,6 +36,12 @@
|
|||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||||
|
<version>2.7.12</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import cn.hutool.core.util.StrUtil;
|
|||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
//import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||||
import com.njcn.advance.api.EventCauseFeignClient;
|
import com.njcn.advance.api.EventCauseFeignClient;
|
||||||
import com.njcn.advance.pojo.dto.EventAnalysisDTO;
|
import com.njcn.advance.pojo.dto.EventAnalysisDTO;
|
||||||
import com.njcn.common.utils.PubUtils;
|
import com.njcn.common.utils.PubUtils;
|
||||||
@@ -18,6 +18,7 @@ import com.njcn.device.pq.pojo.po.DeptLine;
|
|||||||
import com.njcn.device.pq.pojo.vo.AreaLineInfoVO;
|
import com.njcn.device.pq.pojo.vo.AreaLineInfoVO;
|
||||||
import com.njcn.device.pq.pojo.vo.LineDetailDataVO;
|
import com.njcn.device.pq.pojo.vo.LineDetailDataVO;
|
||||||
import com.njcn.event.common.mapper.RmpEventDetailMapper;
|
import com.njcn.event.common.mapper.RmpEventDetailMapper;
|
||||||
|
import com.njcn.event.common.websocket.WebSocketServer;
|
||||||
import com.njcn.event.pojo.vo.SendEventVO;
|
import com.njcn.event.pojo.vo.SendEventVO;
|
||||||
import com.njcn.event.utils.EventUtil;
|
import com.njcn.event.utils.EventUtil;
|
||||||
import com.njcn.event.pojo.dto.EventDeatilDTO;
|
import com.njcn.event.pojo.dto.EventDeatilDTO;
|
||||||
@@ -29,7 +30,9 @@ import com.njcn.system.api.DicDataFeignClient;
|
|||||||
import com.njcn.system.enums.DicDataEnum;
|
import com.njcn.system.enums.DicDataEnum;
|
||||||
import com.njcn.system.pojo.po.DictData;
|
import com.njcn.system.pojo.po.DictData;
|
||||||
import com.njcn.user.api.DeptFeignClient;
|
import com.njcn.user.api.DeptFeignClient;
|
||||||
|
import com.njcn.user.api.UserFeignClient;
|
||||||
import com.njcn.user.pojo.po.Dept;
|
import com.njcn.user.pojo.po.Dept;
|
||||||
|
import com.njcn.user.pojo.po.User;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.influxdb.dto.QueryResult;
|
import org.influxdb.dto.QueryResult;
|
||||||
@@ -44,6 +47,7 @@ import java.math.RoundingMode;
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author denghuajun
|
* @author denghuajun
|
||||||
@@ -57,11 +61,14 @@ public class EventDetailServiceImpl extends ServiceImpl<RmpEventDetailMapper, Rm
|
|||||||
|
|
||||||
private final InfluxDbUtils influxDbUtils;
|
private final InfluxDbUtils influxDbUtils;
|
||||||
private final DicDataFeignClient dicDataFeignClient;
|
private final DicDataFeignClient dicDataFeignClient;
|
||||||
private final MqttPublisher publisher;
|
// private final MqttPublisher publisher;
|
||||||
|
|
||||||
|
private final WebSocketServer webSocketServer;
|
||||||
private final CommTerminalGeneralClient commTerminalGeneralClient;
|
private final CommTerminalGeneralClient commTerminalGeneralClient;
|
||||||
private final LineFeignClient lineFeignClient;
|
private final LineFeignClient lineFeignClient;
|
||||||
private final DeptLineFeignClient deptLineFeignClient;
|
private final DeptLineFeignClient deptLineFeignClient;
|
||||||
private final DeptFeignClient deptFeignClient;
|
private final DeptFeignClient deptFeignClient;
|
||||||
|
private final UserFeignClient userFeignClient;
|
||||||
|
|
||||||
private final EventCauseFeignClient eventCauseFeignClient;
|
private final EventCauseFeignClient eventCauseFeignClient;
|
||||||
@Override
|
@Override
|
||||||
@@ -306,6 +313,9 @@ public class EventDetailServiceImpl extends ServiceImpl<RmpEventDetailMapper, Rm
|
|||||||
String[] idsArray = deptInfo.getPids().split(",");
|
String[] idsArray = deptInfo.getPids().split(",");
|
||||||
dept.addAll(Arrays.asList(idsArray));
|
dept.addAll(Arrays.asList(idsArray));
|
||||||
});
|
});
|
||||||
|
List<String> deptList = dept.stream().collect(Collectors.toList());
|
||||||
|
|
||||||
|
List<User> data = userFeignClient.getUserInfoByDeptIds(deptList).getData();
|
||||||
SendEventVO vo = new SendEventVO();
|
SendEventVO vo = new SendEventVO();
|
||||||
vo.setDeptList(dept);
|
vo.setDeptList(dept);
|
||||||
vo.setTime(po.getStartTime());
|
vo.setTime(po.getStartTime());
|
||||||
@@ -317,7 +327,10 @@ public class EventDetailServiceImpl extends ServiceImpl<RmpEventDetailMapper, Rm
|
|||||||
vo.setLineName(lineInfoVOList.get(0).getLineName());
|
vo.setLineName(lineInfoVOList.get(0).getLineName());
|
||||||
vo.setPowerCompany(lineInfoVOList.get(0).getGdName());
|
vo.setPowerCompany(lineInfoVOList.get(0).getGdName());
|
||||||
vo.setSubstation(lineInfoVOList.get(0).getSubName());
|
vo.setSubstation(lineInfoVOList.get(0).getSubName());
|
||||||
publisher.send("/sendEvent", PubUtils.obj2json(vo), 1, false);
|
for (User datum : data) {
|
||||||
|
webSocketServer.sendMessageToUser(datum.getId(),PubUtils.obj2json(vo));
|
||||||
|
}
|
||||||
|
// publisher.send("/sendEvent", PubUtils.obj2json(vo), 1, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package com.njcn.event.common.websocket;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
|
||||||
|
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/12/13 15:09【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class WebSocketConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ServerEndpointExporter serverEndpointExporter() {
|
||||||
|
return new ServerEndpointExporter();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通信文本消息和二进制缓存区大小
|
||||||
|
* 避免对接 第三方 报文过大时,Websocket 1009 错误
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ServletServerContainerFactoryBean createWebSocketContainer() {
|
||||||
|
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
|
||||||
|
// 在此处设置bufferSize
|
||||||
|
container.setMaxTextMessageBufferSize(10240000);
|
||||||
|
container.setMaxBinaryMessageBufferSize(10240000);
|
||||||
|
container.setMaxSessionIdleTimeout(15 * 60000L);
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,147 @@
|
|||||||
|
package com.njcn.event.common.websocket;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.websocket.*;
|
||||||
|
import javax.websocket.server.PathParam;
|
||||||
|
import javax.websocket.server.ServerEndpoint;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
* Date: 2024/12/13 15:11【需求编号】
|
||||||
|
*
|
||||||
|
* @author clam
|
||||||
|
* @version V1.0.0
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@ServerEndpoint(value = "/event/{userId}")
|
||||||
|
public class WebSocketServer {
|
||||||
|
|
||||||
|
private static final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<>();
|
||||||
|
private static final ConcurrentHashMap<String, Long> lastHeartbeatTime = new ConcurrentHashMap<>();
|
||||||
|
private static final ConcurrentHashMap<String, ScheduledExecutorService> heartbeatExecutors = new ConcurrentHashMap<>();
|
||||||
|
private static final long HEARTBEAT_TIMEOUT = 60; // 60秒超时
|
||||||
|
|
||||||
|
@OnOpen
|
||||||
|
public void onOpen(Session session, @PathParam("userId") String userId) {
|
||||||
|
if (StrUtil.isNotBlank(userId)) {
|
||||||
|
sessions.put(userId, session);
|
||||||
|
lastHeartbeatTime.put(userId, System.currentTimeMillis());
|
||||||
|
sendMessage(session, "连接成功");
|
||||||
|
System.out.println("用户 " + userId + " 已连接");
|
||||||
|
|
||||||
|
// 启动心跳检测
|
||||||
|
startHeartbeat(session, userId);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "用户ID不能为空"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnMessage
|
||||||
|
public void onMessage(String message, Session session, @PathParam("userId") String userId) {
|
||||||
|
if ("alive".equalsIgnoreCase(message)) {
|
||||||
|
// 更新最后心跳时间
|
||||||
|
lastHeartbeatTime.put(userId, System.currentTimeMillis());
|
||||||
|
sendMessage(session, "over");
|
||||||
|
} else {
|
||||||
|
// 处理业务消息
|
||||||
|
System.out.println("收到用户 " + userId + " 的消息: " + message);
|
||||||
|
// TODO: 处理业务逻辑
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnClose
|
||||||
|
public void onClose(Session session, CloseReason closeReason, @PathParam("userId") String userId) {
|
||||||
|
// 移除用户并取消心跳检测
|
||||||
|
sessions.remove(userId);
|
||||||
|
lastHeartbeatTime.remove(userId);
|
||||||
|
ScheduledExecutorService executor = heartbeatExecutors.remove(userId);
|
||||||
|
if (executor != null) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
|
System.out.println("用户 " + userId + " 已断开连接,状态码: " + closeReason.getCloseCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnError
|
||||||
|
public void onError(Session session, Throwable throwable, @PathParam("userId") String userId) {
|
||||||
|
System.out.println("用户 " + userId + " 发生错误: " + throwable.getMessage());
|
||||||
|
try {
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "发生错误"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendMessageToUser(String userId, String message) {
|
||||||
|
Session session = sessions.get(userId);
|
||||||
|
if (session != null && session.isOpen()) {
|
||||||
|
try {
|
||||||
|
session.getBasicRemote().sendText(message);
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
System.out.println("webSocket用户 " + userId + " 不在线或会话已关闭");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
public void sendMessageToAll(String message) {
|
||||||
|
sessions.forEach((userId, session) -> {
|
||||||
|
System.out.println("给用户推送消息" + userId);
|
||||||
|
if (session.isOpen()) {
|
||||||
|
synchronized (lock) {
|
||||||
|
try {
|
||||||
|
session.getBasicRemote().sendText(message);
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.out.println("发送消息给用户 " + userId + " 失败: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessage(Session session, String message) {
|
||||||
|
try {
|
||||||
|
session.getBasicRemote().sendText(message);
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.out.println("发送消息失败: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startHeartbeat(Session session, String userId) {
|
||||||
|
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
heartbeatExecutors.put(userId, executor);
|
||||||
|
|
||||||
|
// 定期检查心跳
|
||||||
|
executor.scheduleAtFixedRate(() -> {
|
||||||
|
long lastTime = lastHeartbeatTime.getOrDefault(userId, 0L);
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// 如果超过30秒没有收到心跳
|
||||||
|
if (currentTime - lastTime > HEARTBEAT_TIMEOUT * 1000) {
|
||||||
|
try {
|
||||||
|
System.out.println("用户 " + userId + " 心跳超时,关闭连接");
|
||||||
|
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.out.println("关闭用户 " + userId + " 连接时出错: " + e.getMessage());
|
||||||
|
}
|
||||||
|
executor.shutdown();
|
||||||
|
heartbeatExecutors.remove(userId);
|
||||||
|
}
|
||||||
|
}, 0, 5, TimeUnit.SECONDS); // 每5秒检查一次
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user