@@ -13,6 +13,7 @@ import com.njcn.csharmonic.service.ILineTargetService;
import com.njcn.csharmonic.service.StableDataService ;
import com.njcn.csharmonic.service.TemperatureService ;
import com.njcn.influx.pojo.dto.StatisticalDataDTO ;
import com.njcn.redis.utils.RedisUtil ;
import com.njcn.system.api.CsStatisticalSetFeignClient ;
import com.njcn.system.pojo.po.EleEpdPqd ;
import lombok.AllArgsConstructor ;
@@ -47,6 +48,8 @@ public class MqttMessageHandler {
private final CsStatisticalSetFeignClient csStatisticalSetFeignClient ;
private final StableDataService stableDataService ;
private final RedisUtil redisUtil ;
private final TemperatureService temperatureService ;
private final DecimalFormat df = new DecimalFormat ( " #0.000 " ) ;
/**
@@ -74,122 +77,129 @@ public class MqttMessageHandler {
*/
@MqttSubscribe ( value = " /zl/askDevData/{devId} " , qos = 1 )
public void responseTopoData ( String topic , @NamedValue ( " devId " ) String devId , MqttMessage message , @Payload String payload ) {
List < ThdDataVO > result = new ArrayList < > ( ) ;
List < ThdDataVO > tempList = new ArrayList < > ( ) ;
ExecutorService executorService = new ThreadPoolExecutor ( 40 , 400 ,
1 , TimeUnit . MINUTES , new ArrayBlockingQueue < > ( 100 , true ) ,
Executors . defaultThreadFactory ( ) , new ThreadPoolExecutor . AbortPolicy ( ) ) ;
List < Future < List < ThdDataVO > > > resultList = new ArrayList < Future < List < ThdDataVO > > > ( ) ;
String topoDataJson = redisUtil . getStringByKey ( devId ) ;
if ( StringUtils . isEmpty ( topoDataJson ) ) {
List < ThdDataVO > result = new ArrayList < > ( ) ;
List < ThdDataVO > tempList = new ArrayList < > ( ) ;
ExecutorService executorService = new ThreadPoolExecutor ( 10 , 100 ,
1 , TimeUnit . MINUTES , new ArrayBlockingQueue < > ( 100 , true ) ,
Executors . defaultThreadFactory ( ) , new ThreadPoolExecutor . AbortPolicy ( ) ) ;
List < Future < List < ThdDataVO > > > resultList = new ArrayList < Future < List < ThdDataVO > > > ( ) ;
//1.查询拓扑图配置的指标:拓扑图扑图配置: 7677f94c749dedaff30f911949cbd724
List < EleEpdPqd > data = csStatisticalSetFeignClient . queryStatisticalSelect ( " 7677f94c749dedaff30f911949cbd724 " ) . getData ( ) ;
data . forEach ( temp - > {
if ( Objects . nonNull ( temp . getHarmStart ( ) ) & & Objects . nonNull ( temp . getHarmEnd ( ) ) ) {
for ( int i = temp . getHarmStart ( ) ; i < temp . getHarmEnd ( ) + 1 ; i + + ) {
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam ( ) ;
commonStatisticalQueryParam . setDevId ( devId ) ;
commonStatisticalQueryParam . setStatisticalId ( temp . getId ( ) ) ;
commonStatisticalQueryParam . setValueType ( " avg " ) ;
commonStatisticalQueryParam . setFrequency ( i + " " ) ;
Future < List < ThdDataVO > > listFuture = executorService . submit ( new TaskWithResult ( commonStatisticalQueryParam ) ) ;
resultList . add ( listFuture ) ;
}
//1.查询拓扑图配置的指标:拓扑图扑图配置: 7677f94c749dedaff30f911949cbd724
List < EleEpdPqd > data = csStatisticalSetFeignClient . queryStatisticalSelect ( " 7677f94c749dedaff30f911949cbd724 " ) . getData ( ) ;
data . forEach ( temp - > {
if ( Objects . nonNull ( temp . getHarmStart ( ) ) & & Objects . nonNull ( temp . getHarmEnd ( ) ) ) {
for ( int i = temp . getHarmStart ( ) ; i < temp . getHarmEnd ( ) + 1 ; i + + ) {
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam ( ) ;
commonStatisticalQueryParam . setDevId ( devId ) ;
commonStatisticalQueryParam . setStatisticalId ( temp . getId ( ) ) ;
commonStatisticalQueryParam . setValueType ( " avg " ) ;
commonStatisticalQueryParam . setFrequency ( i + " " ) ;
Future < List < ThdDataVO > > listFuture = executorService . submit ( new TaskWithResult ( commonStatisticalQueryParam ) ) ;
resultList . add ( listFuture ) ;
}
} else {
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam ( ) ;
commonStatisticalQueryParam . setDevId ( devId ) ;
commonStatisticalQueryParam . setStatisticalId ( temp . getId ( ) ) ;
commonStatisticalQueryParam . setValueType ( " avg " ) ;
Future < List < ThdDataVO > > listFuture = executorService . submit ( new TaskWithResult ( commonStatisticalQueryParam ) ) ;
resultList . add ( listFuture ) ;
}
} else {
CommonStatisticalQueryParam commonStatisticalQueryParam = new CommonStatisticalQueryParam ( ) ;
commonStatisticalQueryParam . setDevId ( devId ) ;
commonStatisticalQueryParam . setStatisticalId ( temp . getId ( ) ) ;
commonStatisticalQueryParam . setValueType ( " avg " ) ;
Future < List < ThdDataVO > > listFuture = executorService . submit ( new TaskWithResult ( commonStatisticalQueryParam ) ) ;
resultList . add ( listFuture ) ;
}
} ) ;
executorService . shutdown ( ) ;
} ) ;
executorService . shutdown ( ) ;
resultList . forEach ( temp - > {
try {
tempList . addAll ( temp . get ( ) ) ;
} catch ( InterruptedException e ) {
throw new RuntimeException ( e ) ;
} catch ( ExecutionException e ) {
throw new RuntimeException ( e ) ;
}
} ) ;
//过滤M相
List < ThdDataVO > m = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getPhase ( ) , " M " ) ) . collect ( Collectors . toList ( ) ) ;
m . stream ( ) . forEach ( temp - > {
Stream . of ( " A " , " B " , " C " ) . forEach ( phase - > {
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( temp , thdDataVO ) ;
thdDataVO . setPhase ( phase ) ;
result . add ( thdDataVO ) ;
} ) ;
} ) ;
//过滤谐波电流,谐波电压畸变率求平均值
List < ThdDataVO > thdI = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Pq_ThdU(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect = thdI . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
resultList . forEach ( temp - > {
try {
tempList . addAll ( temp . get ( ) ) ;
} catch ( InterruptedException e ) {
throw new RuntimeException ( e ) ;
} catch ( ExecutionException e ) {
throw new RuntimeException ( e ) ;
}
} ) ;
//过滤M相
List < ThdDataVO > m = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getPhase ( ) , " M " ) ) . collect ( Collectors . toList ( ) ) ;
m . stream ( ) . forEach ( temp - > {
Stream . of ( " A " , " B " , " C " ) . forEach ( phase - > {
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( temp , thdDataVO ) ;
thdDataVO . setPhase ( phase ) ;
result . add ( thdDataVO ) ;
} ) ;
} ) ;
//过滤谐波电流,谐波电压畸变率求平均值
List < ThdDataVO > thdI = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Pq_ThdU(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect = thdI . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
}
} ) ;
}
} ) ;
List < ThdDataVO > thdV = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Pq_ThdI(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect1 = thdV . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect1 . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
List < ThdDataVO > thdV = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Pq_ThdI(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect1 = thdV . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect1 . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
}
} ) ;
List < ThdDataVO > apfThdI = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Apf_ThdA_Load(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect3 = apfThdI . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect3 . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
}
} ) ;
List < ThdDataVO > apfThdI = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Apf_ThdA_Load(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect3 = apfThdI . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect3 . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
}
} ) ;
List < ThdDataVO > apfThdISys = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Apf_ThdA_Sys(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect4 = apfThdISys . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect4 . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
}
} ) ;
List < ThdDataVO > apfThdISys = tempList . stream ( ) . filter ( temp - > Objects . equals ( temp . getStatisticalName ( ) , " Apf_ThdA_Sys(%) " ) ) . collect ( Collectors . toList ( ) ) ;
Map < String , List < ThdDataVO > > collect4 = apfThdISys . stream ( ) . collect ( Collectors . groupingBy ( ThdDataVO : : getLineId ) ) ;
collect4 . forEach ( ( k , v ) - > {
if ( ! CollectionUtil . isEmpty ( v ) ) {
double asDouble = v . stream ( ) . mapToDouble ( ThdDataVO : : getStatisticalData ) . average ( ) . getAsDouble ( ) ;
ThdDataVO thdDataVO = new ThdDataVO ( ) ;
BeanUtils . copyProperties ( v . get ( 0 ) , thdDataVO ) ;
thdDataVO . setStatisticalData ( Double . valueOf ( df . format ( asDouble ) ) ) ;
thdDataVO . setPhase ( " avg " ) ;
result . add ( thdDataVO ) ;
}
} ) ;
}
} ) ;
List < ThdDataVO > notM = tempList . stream ( ) . filter ( temp - > ! Objects . equals ( temp . getPhase ( ) , " M " ) ) . collect ( Collectors . toList ( ) ) ;
List < ThdDataVO > notM = tempList . stream ( ) . filter ( temp - > ! Objects . equals ( temp . getPhase ( ) , " M " ) ) . collect ( Collectors . toList ( ) ) ;
result . addAll ( notM ) ;
result . addAll ( notM ) ;
Gson gson = new Gson ( ) ;
publisher . send ( " /zl/devData/ " + devId , gson . toJson ( result ) , 1 , false ) ;
Gson gson = new Gson ( ) ;
topoDataJson = gson . toJson ( result ) ;
redisUtil . saveByKeyWithExpire ( devId , ( Object ) topoDataJson , Long . valueOf ( 10 * 60 ) ) ;
}
publisher . send ( " /zl/devData/ " + devId , topoDataJson , 1 , false ) ;
}
public String getCldidName ( String cldid ) {