暂态事件处理
This commit is contained in:
@@ -20,6 +20,8 @@ public enum AccessResponseEnum {
|
|||||||
MODEL_REPEAT("A0302", "模板存在,请勿重复录入!"),
|
MODEL_REPEAT("A0302", "模板存在,请勿重复录入!"),
|
||||||
MODEL_NO_FIND("A0302", "模板不存在,请先录入模板数据!"),
|
MODEL_NO_FIND("A0302", "模板不存在,请先录入模板数据!"),
|
||||||
|
|
||||||
|
DICT_ANALYSIS_ERROR("A0303","字典解析错误!"),
|
||||||
|
MODEL_ANALYSIS_ERROR("A0303","模板解析错误!"),
|
||||||
MESSAGE_TYPE_ERROR("A0303","报文消息类型Type错误!"),
|
MESSAGE_TYPE_ERROR("A0303","报文消息类型Type错误!"),
|
||||||
DEV_TYPE_ERROR("A0303","装置类型错误!"),
|
DEV_TYPE_ERROR("A0303","装置类型错误!"),
|
||||||
DEV_NOT_FIND("A0303","装置类型未找到!"),
|
DEV_NOT_FIND("A0303","装置类型未找到!"),
|
||||||
@@ -58,6 +60,8 @@ public enum AccessResponseEnum {
|
|||||||
LDEVINFO_IS_NULL("A0309","逻辑设备信息为空"),
|
LDEVINFO_IS_NULL("A0309","逻辑设备信息为空"),
|
||||||
SOFTINFO_IS_NULL("A0309","软件信息为空"),
|
SOFTINFO_IS_NULL("A0309","软件信息为空"),
|
||||||
|
|
||||||
|
LINE_POSITION_REPEAT("A0310","监测点位置重复")
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private final String code;
|
private final String code;
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ public enum TypeEnum {
|
|||||||
DATA_12("12","定值Set"),
|
DATA_12("12","定值Set"),
|
||||||
DATA_13("13","内部定值InSet"),
|
DATA_13("13","内部定值InSet"),
|
||||||
DATA_14("14","控制Ctrl"),
|
DATA_14("14","控制Ctrl"),
|
||||||
|
DATA_16("16","波形文件"),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 数据模型列表
|
* 数据模型列表
|
||||||
|
|||||||
@@ -0,0 +1,94 @@
|
|||||||
|
package com.njcn.access.pojo.dto;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
|
||||||
|
import io.swagger.annotations.ApiModelProperty;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 类的介绍:
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @createTime 2023/8/22 11:36
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class EventDto {
|
||||||
|
|
||||||
|
@SerializedName("Mid")
|
||||||
|
private Integer mid;
|
||||||
|
|
||||||
|
@SerializedName("Did")
|
||||||
|
@ApiModelProperty("逻辑设备 治理逻辑设备为1 电能质量设备为2")
|
||||||
|
private Integer did;
|
||||||
|
|
||||||
|
@SerializedName("Pri")
|
||||||
|
private Integer pri;
|
||||||
|
|
||||||
|
@SerializedName("Type")
|
||||||
|
private Integer type;
|
||||||
|
|
||||||
|
@SerializedName("Msg")
|
||||||
|
private Msg msg;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class Msg{
|
||||||
|
|
||||||
|
@SerializedName("Cldid")
|
||||||
|
@ApiModelProperty("逻辑子设备 治理逻辑设备为0 电能质量设备为1、2")
|
||||||
|
private Integer clDid;
|
||||||
|
|
||||||
|
@SerializedName("DataType")
|
||||||
|
private Integer dataType;
|
||||||
|
|
||||||
|
@SerializedName("DataAttr")
|
||||||
|
@ApiModelProperty("数据属性:无-0、实时-1、统计-2")
|
||||||
|
private Integer dataAttr;
|
||||||
|
|
||||||
|
@SerializedName("DsNameIdx")
|
||||||
|
private Integer dsNameIdx;
|
||||||
|
|
||||||
|
@SerializedName("DataArray")
|
||||||
|
private List<DataArray> dataArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class DataArray{
|
||||||
|
|
||||||
|
@SerializedName("Idx")
|
||||||
|
private Integer idx;
|
||||||
|
|
||||||
|
@SerializedName("Name")
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
@SerializedName("DataTimeSec")
|
||||||
|
private Long dataTimeSec;
|
||||||
|
|
||||||
|
@SerializedName("DataTimeUSec")
|
||||||
|
private Long dataTimeUSec;
|
||||||
|
|
||||||
|
@SerializedName("Type")
|
||||||
|
@ApiModelProperty("事件类型")
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
@SerializedName("Parm")
|
||||||
|
private List<Param> param;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class Param{
|
||||||
|
|
||||||
|
@SerializedName("Name")
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
@SerializedName("Type")
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
@SerializedName("Unit")
|
||||||
|
private String unit;
|
||||||
|
|
||||||
|
@SerializedName("Data")
|
||||||
|
private Object data;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,62 +0,0 @@
|
|||||||
package com.njcn.access.pojo.po;
|
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.annotation.TableId;
|
|
||||||
import com.baomidou.mybatisplus.annotation.TableName;
|
|
||||||
import com.github.jeffreyning.mybatisplus.anno.MppMultiId;
|
|
||||||
import com.njcn.db.bo.BaseEntity;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* 事件参数表
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @author xuyang
|
|
||||||
* @since 2023-05-19
|
|
||||||
*/
|
|
||||||
@EqualsAndHashCode(callSuper = true)
|
|
||||||
@Data
|
|
||||||
@TableName("cs_evt_parm")
|
|
||||||
public class CsEvtParmPO extends BaseEntity {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* id
|
|
||||||
*/
|
|
||||||
private String id;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* pid
|
|
||||||
*/
|
|
||||||
@MppMultiId(value = "pid")
|
|
||||||
private String pid;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 事件信息名称
|
|
||||||
*/
|
|
||||||
@MppMultiId(value = "name")
|
|
||||||
private String name;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参数类型“string”“float”
|
|
||||||
*/
|
|
||||||
private String type;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参数单位
|
|
||||||
*/
|
|
||||||
private String unit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 参数值
|
|
||||||
*/
|
|
||||||
private String data;
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -25,8 +25,11 @@ import com.njcn.csdevice.api.DataSetFeignClient;
|
|||||||
import com.njcn.csdevice.api.DevModelFeignClient;
|
import com.njcn.csdevice.api.DevModelFeignClient;
|
||||||
import com.njcn.csdevice.pojo.po.CsDataSet;
|
import com.njcn.csdevice.pojo.po.CsDataSet;
|
||||||
import com.njcn.csdevice.pojo.po.CsDevModelPO;
|
import com.njcn.csdevice.pojo.po.CsDevModelPO;
|
||||||
|
import com.njcn.mq.constant.BusinessTopic;
|
||||||
import com.njcn.mq.message.AppAutoDataMessage;
|
import com.njcn.mq.message.AppAutoDataMessage;
|
||||||
|
import com.njcn.mq.message.AppEventMessage;
|
||||||
import com.njcn.mq.template.AppAutoDataMessageTemplate;
|
import com.njcn.mq.template.AppAutoDataMessageTemplate;
|
||||||
|
import com.njcn.mq.template.AppEventMessageTemplate;
|
||||||
import com.njcn.redis.pojo.enums.AppRedisKey;
|
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||||
import com.njcn.redis.utils.RedisUtil;
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
@@ -65,9 +68,11 @@ public class MqttMessageHandler {
|
|||||||
|
|
||||||
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
|
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
|
||||||
|
|
||||||
|
private final DataSetFeignClient dataSetFeignClient;
|
||||||
|
|
||||||
private final AppAutoDataMessageTemplate appAutoDataMessageTemplate;
|
private final AppAutoDataMessageTemplate appAutoDataMessageTemplate;
|
||||||
|
|
||||||
private final DataSetFeignClient dataSetFeignClient;
|
private final AppEventMessageTemplate appEventMessageTemplate;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
Validator validator;
|
Validator validator;
|
||||||
@@ -293,12 +298,33 @@ public class MqttMessageHandler {
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 4866:
|
case 4866:
|
||||||
//处理主动上送数据
|
|
||||||
AutoDataDto dataDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), AutoDataDto.class);
|
AutoDataDto dataDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), AutoDataDto.class);
|
||||||
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(dataDto));
|
switch (dataDto.getMsg().getDataAttr()) {
|
||||||
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class);
|
//暂态事件、录波处理
|
||||||
|
//todo 后期告警可能也是在这处理,通过是告警还是事件来区分暂态和稳态
|
||||||
|
case 0:
|
||||||
|
log.info("处理事件");
|
||||||
|
EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class);
|
||||||
|
JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto));
|
||||||
|
AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class);
|
||||||
|
appEventMessage.setId(nDid);
|
||||||
|
appEventMessageTemplate.sendMember(appEventMessage);
|
||||||
|
break;
|
||||||
|
//实时数据
|
||||||
|
case 1:
|
||||||
|
log.info("处理实时数据");
|
||||||
|
break;
|
||||||
|
//处理主动上送的统计数据
|
||||||
|
case 2:
|
||||||
|
log.info("处理统计数据");
|
||||||
|
JSONObject jsonObject2 = JSONObject.parseObject(JSON.toJSONString(dataDto));
|
||||||
|
AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject2, AppAutoDataMessage.class);
|
||||||
appAutoDataMessage.setId(nDid);
|
appAutoDataMessage.setId(nDid);
|
||||||
appAutoDataMessageTemplate.sendMember(appAutoDataMessage);
|
appAutoDataMessageTemplate.sendMember(appAutoDataMessage);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
//mid大于0,则需要应答设备侧
|
//mid大于0,则需要应答设备侧
|
||||||
if (dataDto.getMid() > 0){
|
if (dataDto.getMid() > 0){
|
||||||
ReqAndResDto.Res response = new ReqAndResDto.Res();
|
ReqAndResDto.Res response = new ReqAndResDto.Res();
|
||||||
|
|||||||
@@ -85,8 +85,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
|||||||
analysisDataSet(templateDto,csDevModelPo.getId());
|
analysisDataSet(templateDto,csDevModelPo.getId());
|
||||||
//3.录入监测点模板表(记录当前模板有几个监测点,治理类型的模板目前规定1个监测点,电能质量模板根据逻辑子设备来)
|
//3.录入监测点模板表(记录当前模板有几个监测点,治理类型的模板目前规定1个监测点,电能质量模板根据逻辑子设备来)
|
||||||
addCsLineModel(templateDto,csDevModelPo.getId());
|
addCsLineModel(templateDto,csDevModelPo.getId());
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
throw new BusinessException(AccessResponseEnum.MODEL_ANALYSIS_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,8 +99,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
|||||||
Gson gson = new Gson();
|
Gson gson = new Gson();
|
||||||
TemplateDto templateDto = gson.fromJson(json, TemplateDto.class);
|
TemplateDto templateDto = gson.fromJson(json, TemplateDto.class);
|
||||||
analysisDict(templateDto);
|
analysisDict(templateDto);
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
throw new BusinessException(AccessResponseEnum.DICT_ANALYSIS_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -201,6 +201,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
|
|||||||
eleEvtParam.setPid(po.getId());
|
eleEvtParam.setPid(po.getId());
|
||||||
eleEvtParam.setData(param.getData());
|
eleEvtParam.setData(param.getData());
|
||||||
eleEvtParam.setName(param.getName());
|
eleEvtParam.setName(param.getName());
|
||||||
|
eleEvtParam.setShowName(param.getName());
|
||||||
eleEvtParam.setType(param.getType());
|
eleEvtParam.setType(param.getType());
|
||||||
eleEvtParam.setUnit(param.getUnit());
|
eleEvtParam.setUnit(param.getUnit());
|
||||||
eleEvtFeignClient.add(eleEvtParam);
|
eleEvtFeignClient.add(eleEvtParam);
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.njcn.access.service.impl;
|
package com.njcn.access.service.impl;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.IdUtil;
|
||||||
import com.alibaba.excel.util.CollectionUtils;
|
import com.alibaba.excel.util.CollectionUtils;
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
@@ -46,6 +47,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 类的介绍:
|
* 类的介绍:
|
||||||
@@ -228,6 +230,11 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
|||||||
appLineTopologyDiagramPo.setStatus("1");
|
appLineTopologyDiagramPo.setStatus("1");
|
||||||
appLineTopologyDiagramPoList.add(appLineTopologyDiagramPo);
|
appLineTopologyDiagramPoList.add(appLineTopologyDiagramPo);
|
||||||
}
|
}
|
||||||
|
List<String> position = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList());
|
||||||
|
List<String> lineList = position.stream().filter(e-> Collections.frequency(position,e) > 1).distinct().collect(Collectors.toList());
|
||||||
|
if (CollectionUtil.isNotEmpty(lineList)){
|
||||||
|
throw new BusinessException(AccessResponseEnum.LINE_POSITION_REPEAT);
|
||||||
|
}
|
||||||
csLineService.saveBatch(csLinePoList);
|
csLineService.saveBatch(csLinePoList);
|
||||||
//4.监测点拓扑图表录入关系
|
//4.监测点拓扑图表录入关系
|
||||||
appLineTopologyDiagramService.saveBatch(appLineTopologyDiagramPoList);
|
appLineTopologyDiagramService.saveBatch(appLineTopologyDiagramPoList);
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.njcn;
|
|||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.IdUtil;
|
||||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||||
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||||
@@ -11,6 +12,7 @@ import com.njcn.access.enums.AccessEnum;
|
|||||||
import com.njcn.access.enums.TypeEnum;
|
import com.njcn.access.enums.TypeEnum;
|
||||||
import com.njcn.access.pojo.dto.ReqAndResDto;
|
import com.njcn.access.pojo.dto.ReqAndResDto;
|
||||||
import com.njcn.access.pojo.dto.mqtt.MqttClientDto;
|
import com.njcn.access.pojo.dto.mqtt.MqttClientDto;
|
||||||
|
import com.njcn.csdevice.pojo.po.CsLinePO;
|
||||||
import io.lettuce.core.protocol.CompleteableCommand;
|
import io.lettuce.core.protocol.CompleteableCommand;
|
||||||
import okhttp3.Credentials;
|
import okhttp3.Credentials;
|
||||||
import okhttp3.OkHttpClient;
|
import okhttp3.OkHttpClient;
|
||||||
@@ -25,8 +27,9 @@ import org.springframework.test.context.web.WebAppConfiguration;
|
|||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Array;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Objects;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@@ -56,12 +59,34 @@ public class AppTest
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() {
|
public void test() {
|
||||||
ReqAndResDto reqAndResParam = new ReqAndResDto();
|
// ReqAndResDto reqAndResParam = new ReqAndResDto();
|
||||||
reqAndResParam.setMid(1);
|
// reqAndResParam.setMid(1);
|
||||||
reqAndResParam.setDid(0);
|
// reqAndResParam.setDid(0);
|
||||||
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
|
||||||
reqAndResParam.setType(4866);
|
// reqAndResParam.setType(4866);
|
||||||
publisher.send("/Dev/Data1/V1/123", new Gson().toJson(reqAndResParam),1,false);
|
// publisher.send("/Dev/Data1/V1/123", new Gson().toJson(reqAndResParam),1,false);
|
||||||
|
|
||||||
|
// String key = String.valueOf(IdUtil.getSnowflake().nextId());
|
||||||
|
// System.out.println("key==:" + key);
|
||||||
|
|
||||||
|
List<CsLinePO> csLinePoList = new ArrayList<>();
|
||||||
|
CsLinePO po1 = new CsLinePO();
|
||||||
|
po1.setPosition("1");
|
||||||
|
CsLinePO po2= new CsLinePO();
|
||||||
|
po2.setPosition("2");
|
||||||
|
CsLinePO po3= new CsLinePO();
|
||||||
|
po3.setPosition("3");
|
||||||
|
CsLinePO po4= new CsLinePO();
|
||||||
|
po4.setPosition("1");
|
||||||
|
|
||||||
|
csLinePoList.add(po1);
|
||||||
|
csLinePoList.add(po2);
|
||||||
|
csLinePoList.add(po3);
|
||||||
|
csLinePoList.add(po4);
|
||||||
|
List<String> l = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList());
|
||||||
|
System.out.println("l===:" + l);
|
||||||
|
List<String> lineList = l.stream().filter(e-> Collections.frequency(l,e) > 1).distinct().collect(Collectors.toList());
|
||||||
|
System.out.println("lineList==:" + lineList);
|
||||||
}
|
}
|
||||||
|
|
||||||
// public static void main(String[] args) {
|
// public static void main(String[] args) {
|
||||||
|
|||||||
@@ -56,8 +56,6 @@ public class StatServiceImpl implements IStatService {
|
|||||||
|
|
||||||
private final RedisUtil redisUtil;
|
private final RedisUtil redisUtil;
|
||||||
|
|
||||||
private final Integer NUMBER = 200;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public void analysis(AppAutoDataMessage appAutoDataMessage) {
|
public void analysis(AppAutoDataMessage appAutoDataMessage) {
|
||||||
|
|||||||
@@ -1,14 +1,38 @@
|
|||||||
package com.njcn;
|
package com.njcn;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||||
|
import com.njcn.common.utils.PubUtils;
|
||||||
|
import com.njcn.csdevice.pojo.po.CsDataArray;
|
||||||
|
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
|
||||||
|
import com.njcn.influxdb.utils.InfluxDbUtils;
|
||||||
|
import com.njcn.mq.message.AppAutoDataMessage;
|
||||||
|
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import org.influxdb.InfluxDB;
|
||||||
|
import org.influxdb.dto.BatchPoints;
|
||||||
|
import org.influxdb.dto.Point;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.springframework.web.bind.annotation.ResponseBody;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit test for simple App.
|
* Unit test for simple App.
|
||||||
*/
|
*/
|
||||||
public class AppTest
|
public class AppTest
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisUtil redisUtil;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private InfluxDbUtils influxDbUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rigorous Test :-)
|
* Rigorous Test :-)
|
||||||
*/
|
*/
|
||||||
@@ -17,4 +41,5 @@ public class AppTest
|
|||||||
{
|
{
|
||||||
assertTrue( true );
|
assertTrue( true );
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,32 @@
|
|||||||
<version>1.0.0</version>
|
<version>1.0.0</version>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>common-core</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>common-db</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>common-microservice</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>common-mq</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
<artifactId>zl-event-api</artifactId>
|
<artifactId>zl-event-api</artifactId>
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package com.njcn.zlevent.api;
|
||||||
|
|
||||||
|
import com.njcn.common.pojo.constant.ServerInfo;
|
||||||
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
|
import com.njcn.mq.message.AppEventMessage;
|
||||||
|
import com.njcn.zlevent.api.fallback.EventClientFallbackFactory;
|
||||||
|
import org.springframework.cloud.openfeign.FeignClient;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xy
|
||||||
|
*/
|
||||||
|
@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/event", fallbackFactory = EventClientFallbackFactory.class,contextId = "event")
|
||||||
|
public interface EventFeignClient {
|
||||||
|
|
||||||
|
@PostMapping("/analysis")
|
||||||
|
HttpResult<String> analysis(AppEventMessage appEventMessage);
|
||||||
|
}
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package com.njcn.zlevent.api.fallback;
|
||||||
|
|
||||||
|
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||||
|
import com.njcn.common.pojo.exception.BusinessException;
|
||||||
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
|
import com.njcn.mq.message.AppEventMessage;
|
||||||
|
import com.njcn.zlevent.api.EventFeignClient;
|
||||||
|
import feign.hystrix.FallbackFactory;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xy
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class EventClientFallbackFactory implements FallbackFactory<EventFeignClient> {
|
||||||
|
@Override
|
||||||
|
public EventFeignClient create(Throwable cause) {
|
||||||
|
//判断抛出异常是否为解码器抛出的业务异常
|
||||||
|
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
|
||||||
|
if (cause.getCause() instanceof BusinessException) {
|
||||||
|
BusinessException businessException = (BusinessException) cause.getCause();
|
||||||
|
}
|
||||||
|
Enum<?> finalExceptionEnum = exceptionEnum;
|
||||||
|
return new EventFeignClient() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpResult<String> analysis(AppEventMessage appEventMessage) {
|
||||||
|
log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString());
|
||||||
|
throw new BusinessException(finalExceptionEnum);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
package com.njcn.zlevent.pojo.po;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import com.njcn.db.bo.BaseEntity;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件表
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@TableName("cs_event")
|
||||||
|
public class CsEvent {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* id
|
||||||
|
*/
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 监测点id
|
||||||
|
*/
|
||||||
|
private String lineId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 事件名称
|
||||||
|
*/
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 展示名称
|
||||||
|
*/
|
||||||
|
private String showName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 开始时间
|
||||||
|
*/
|
||||||
|
private LocalDateTime startTime;
|
||||||
|
}
|
||||||
@@ -0,0 +1,61 @@
|
|||||||
|
package com.njcn.zlevent.pojo.po;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件详情表
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@TableName("cs_event_detail")
|
||||||
|
public class CsEventDetail {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* id
|
||||||
|
*/
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 暂态事件id
|
||||||
|
*/
|
||||||
|
private String pid;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指标名称
|
||||||
|
*/
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指标别名
|
||||||
|
*/
|
||||||
|
private String showName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 数据类型
|
||||||
|
*/
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 单位
|
||||||
|
*/
|
||||||
|
private String unit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 数值
|
||||||
|
*/
|
||||||
|
private Double data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 相别
|
||||||
|
*/
|
||||||
|
private String phasic;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -20,6 +20,15 @@
|
|||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.baomidou</groupId>
|
||||||
|
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>common-db</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.njcn</groupId>
|
<groupId>com.njcn</groupId>
|
||||||
<artifactId>common-web</artifactId>
|
<artifactId>common-web</artifactId>
|
||||||
@@ -39,6 +48,31 @@
|
|||||||
<artifactId>common-db</artifactId>
|
<artifactId>common-db</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>common-mq</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>cs-device-api</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>stat-api</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>zl-event-api</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>system-api</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -0,0 +1,47 @@
|
|||||||
|
package com.njcn.zlevent.controller;
|
||||||
|
|
||||||
|
import com.njcn.common.pojo.annotation.OperateInfo;
|
||||||
|
import com.njcn.common.pojo.enums.common.LogEnum;
|
||||||
|
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||||
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
|
import com.njcn.common.utils.HttpResultUtil;
|
||||||
|
import com.njcn.mq.message.AppEventMessage;
|
||||||
|
import com.njcn.web.controller.BaseController;
|
||||||
|
import com.njcn.zlevent.service.IEventService;
|
||||||
|
import io.swagger.annotations.Api;
|
||||||
|
import io.swagger.annotations.ApiImplicitParam;
|
||||||
|
import io.swagger.annotations.ApiOperation;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 类的介绍:
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @createTime 2023/8/14 9:23
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/event")
|
||||||
|
@Api(tags = "暂态事件处理")
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class EventController extends BaseController {
|
||||||
|
|
||||||
|
private final IEventService eventService;
|
||||||
|
|
||||||
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
|
@PostMapping("/analysis")
|
||||||
|
@ApiOperation("事件解析")
|
||||||
|
@ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true)
|
||||||
|
public HttpResult<String> analysis(@RequestBody AppEventMessage appEventMessage){
|
||||||
|
String methodDescribe = getMethodDescribe("analysis");
|
||||||
|
eventService.analysis(appEventMessage);
|
||||||
|
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package com.njcn.zlevent.controller;
|
||||||
|
|
||||||
|
import com.njcn.common.pojo.annotation.OperateInfo;
|
||||||
|
import com.njcn.common.pojo.enums.common.LogEnum;
|
||||||
|
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||||
|
import com.njcn.common.pojo.response.HttpResult;
|
||||||
|
import com.njcn.common.utils.HttpResultUtil;
|
||||||
|
import com.njcn.web.controller.BaseController;
|
||||||
|
import io.swagger.annotations.Api;
|
||||||
|
import io.swagger.annotations.ApiImplicitParam;
|
||||||
|
import io.swagger.annotations.ApiOperation;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 类的介绍:
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @crateTime 2023/8/23 9:59
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/wave")
|
||||||
|
@Api(tags = "录波处理")
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class WaveController extends BaseController {
|
||||||
|
|
||||||
|
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
|
||||||
|
@PostMapping("/analysis")
|
||||||
|
@ApiOperation("录波解析")
|
||||||
|
@ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true)
|
||||||
|
public HttpResult<String> analysis(){
|
||||||
|
String methodDescribe = getMethodDescribe("analysis");
|
||||||
|
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package com.njcn.zlevent.handler;
|
||||||
|
|
||||||
|
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.validation.Validator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @date 2023年08月23日 09:41
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class MqttMessageHandler {
|
||||||
|
|
||||||
|
private final MqttPublisher publisher;
|
||||||
|
|
||||||
|
private final RedisUtil redisUtil;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
Validator validator;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,67 @@
|
|||||||
|
package com.njcn.zlevent.listener;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import com.njcn.common.pojo.exception.BusinessException;
|
||||||
|
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import com.njcn.stat.enums.StatResponseEnum;
|
||||||
|
import com.njcn.system.api.EpdFeignClient;
|
||||||
|
import com.njcn.system.pojo.dto.EpdDTO;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
|
import org.springframework.data.redis.connection.Message;
|
||||||
|
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||||
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author hongawen
|
||||||
|
* @version 1.0.0
|
||||||
|
* @date 2022年04月02日 14:31
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private EpdFeignClient epdFeignClient;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisUtil redisUtil;
|
||||||
|
|
||||||
|
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
|
||||||
|
super(listenerContainer);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 针对redis数据失效事件,进行数据处理
|
||||||
|
* 注意message.toString()可以获取失效的key
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
@Order(0)
|
||||||
|
public void onMessage(Message message, byte[] pattern) {
|
||||||
|
if (StringUtils.isBlank(message.toString())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//判断失效的key
|
||||||
|
String expiredKey = message.toString();
|
||||||
|
if(expiredKey.equals(AppRedisKey.ELE_EPD_PQD)){
|
||||||
|
Map<String,String> map = new HashMap<>();
|
||||||
|
List<EpdDTO> list = epdFeignClient.findAll().getData();
|
||||||
|
if (CollectionUtil.isEmpty(list)){
|
||||||
|
throw new BusinessException(StatResponseEnum.DICT_NULL);
|
||||||
|
}
|
||||||
|
list.forEach(item->{
|
||||||
|
map.put(item.getDictName(),item.getTableName());
|
||||||
|
});
|
||||||
|
redisUtil.saveByKeyWithExpire(AppRedisKey.ELE_EPD_PQD,map,3600L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.njcn.zlevent.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEventDetail;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件详情表 Mapper 接口
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
public interface CsEventDetailMapper extends BaseMapper<CsEventDetail> {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.njcn.zlevent.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEvent;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件表 Mapper 接口
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
public interface CsEventMapper extends BaseMapper<CsEvent> {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.njcn.zlevent.service;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEventDetail;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件详情表 服务类
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
public interface ICsEventDetailService extends IService<CsEventDetail> {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.njcn.zlevent.service;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEvent;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件表 服务类
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
public interface ICsEventService extends IService<CsEvent> {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.njcn.zlevent.service;
|
||||||
|
|
||||||
|
import com.njcn.mq.message.AppEventMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xy
|
||||||
|
*/
|
||||||
|
public interface IEventService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 解析事件数据
|
||||||
|
* @param
|
||||||
|
*/
|
||||||
|
void analysis(AppEventMessage appEventMessage);
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package com.njcn.zlevent.service.impl;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
import com.njcn.zlevent.mapper.CsEventDetailMapper;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEventDetail;
|
||||||
|
import com.njcn.zlevent.service.ICsEventDetailService;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件详情表 服务实现类
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class CsEventDetailServiceImpl extends ServiceImpl<CsEventDetailMapper, CsEventDetail> implements ICsEventDetailService {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package com.njcn.zlevent.service.impl;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
import com.njcn.zlevent.mapper.CsEventMapper;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEvent;
|
||||||
|
import com.njcn.zlevent.service.ICsEventService;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* 暂态事件表 服务实现类
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @since 2023-08-23
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class CsEventServiceImpl extends ServiceImpl<CsEventMapper, CsEvent> implements ICsEventService {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,141 @@
|
|||||||
|
package com.njcn.zlevent.service.impl;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import cn.hutool.core.util.IdUtil;
|
||||||
|
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||||
|
import com.njcn.common.pojo.exception.BusinessException;
|
||||||
|
import com.njcn.csdevice.api.CsLineFeignClient;
|
||||||
|
import com.njcn.csdevice.pojo.po.CsLinePO;
|
||||||
|
import com.njcn.mq.message.AppEventMessage;
|
||||||
|
import com.njcn.redis.pojo.enums.AppRedisKey;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import com.njcn.stat.enums.StatResponseEnum;
|
||||||
|
import com.njcn.system.api.DicDataFeignClient;
|
||||||
|
import com.njcn.system.api.EpdFeignClient;
|
||||||
|
import com.njcn.system.enums.DicDataEnum;
|
||||||
|
import com.njcn.system.pojo.po.DictData;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEvent;
|
||||||
|
import com.njcn.zlevent.pojo.po.CsEventDetail;
|
||||||
|
import com.njcn.zlevent.service.ICsEventDetailService;
|
||||||
|
import com.njcn.zlevent.service.ICsEventService;
|
||||||
|
import com.njcn.zlevent.service.IEventService;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 类的介绍:
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @createTime 2023/8/14 9:32
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class EventServiceImpl implements IEventService {
|
||||||
|
|
||||||
|
private final CsLineFeignClient csLineFeignClient;
|
||||||
|
|
||||||
|
private final DicDataFeignClient dicDataFeignClient;
|
||||||
|
|
||||||
|
private final EpdFeignClient epdFeignClient;
|
||||||
|
|
||||||
|
private final RedisUtil redisUtil;
|
||||||
|
|
||||||
|
private final ICsEventService csEventService;
|
||||||
|
|
||||||
|
private final ICsEventDetailService csEventDetailService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Transactional(rollbackFor = Exception.class)
|
||||||
|
public void analysis(AppEventMessage appEventMessage) {
|
||||||
|
List<CsEvent> list1 = new ArrayList<>();
|
||||||
|
List<CsEventDetail> list2 = new ArrayList<>();
|
||||||
|
//获取监测点id
|
||||||
|
String lineId = null;
|
||||||
|
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
|
||||||
|
if (Objects.isNull(object1)){
|
||||||
|
lineInfo(appEventMessage.getId());
|
||||||
|
}
|
||||||
|
if (Objects.equals(appEventMessage.getDid(),1)){
|
||||||
|
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
|
||||||
|
} else if (Objects.equals(appEventMessage.getDid(),2)){
|
||||||
|
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get(appEventMessage.getMsg().getClDid().toString()).toString();
|
||||||
|
}
|
||||||
|
//处理事件数据
|
||||||
|
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
|
||||||
|
for (AppEventMessage.DataArray item : dataArray) {
|
||||||
|
String id = IdUtil.fastSimpleUUID();
|
||||||
|
//事件入库
|
||||||
|
CsEvent csEvent = new CsEvent();
|
||||||
|
csEvent.setId(id);
|
||||||
|
csEvent.setLineId(lineId);
|
||||||
|
csEvent.setName(item.getName());
|
||||||
|
csEvent.setShowName(epdFeignClient.findByName(item.getName()).getData().getShowName());
|
||||||
|
csEvent.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
|
||||||
|
List<AppEventMessage.Param> params = item.getParam();
|
||||||
|
for (AppEventMessage.Param item2 : params) {
|
||||||
|
CsEventDetail csEventDetail = new CsEventDetail();
|
||||||
|
csEventDetail.setPid(id);
|
||||||
|
csEventDetail.setName(item2.getName());
|
||||||
|
csEventDetail.setShowName(epdFeignClient.findByName(item2.getName()).getData().getShowName());
|
||||||
|
csEventDetail.setType(item2.getType());
|
||||||
|
csEventDetail.setUnit(item2.getUnit());
|
||||||
|
if (Objects.equals(item2.getName(),"Evt_VVaPhas")){
|
||||||
|
csEventDetail.setPhasic(item2.getData().toString());
|
||||||
|
} else {
|
||||||
|
csEventDetail.setData(Double.parseDouble(item2.getData().toString()));
|
||||||
|
}
|
||||||
|
list2.add(csEventDetail);
|
||||||
|
}
|
||||||
|
list1.add(csEvent);
|
||||||
|
}
|
||||||
|
csEventService.saveBatch(list1);
|
||||||
|
csEventDetailService.saveBatch(list2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 缓存监测点相关信息
|
||||||
|
*/
|
||||||
|
public void lineInfo(String id) {
|
||||||
|
Map<Integer,String> map = new HashMap<>();
|
||||||
|
List<CsLinePO> lineList = csLineFeignClient.findByNdid(id).getData();
|
||||||
|
if (CollectionUtil.isEmpty(lineList)){
|
||||||
|
throw new BusinessException(StatResponseEnum.LINE_NULL);
|
||||||
|
}
|
||||||
|
for (CsLinePO item : lineList) {
|
||||||
|
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
|
||||||
|
if (Objects.isNull(dictData)){
|
||||||
|
throw new BusinessException(StatResponseEnum.DICT_NULL);
|
||||||
|
}
|
||||||
|
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
|
||||||
|
map.put(0,item.getLineId());
|
||||||
|
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
|
||||||
|
map.put(1,item.getLineId());
|
||||||
|
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
|
||||||
|
map.put(2,item.getLineId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,600L);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 时间转换
|
||||||
|
*/
|
||||||
|
public LocalDateTime timeFormat(Long time1, Long time2) {
|
||||||
|
//设置格式
|
||||||
|
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
|
String timeText = format.format(time1 * 1000);
|
||||||
|
String time = timeText + "." + time2;
|
||||||
|
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
|
||||||
|
return LocalDateTime.parse(time, fmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -66,6 +66,11 @@
|
|||||||
<artifactId>system-api</artifactId>
|
<artifactId>system-api</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.njcn</groupId>
|
||||||
|
<artifactId>zl-event-api</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -50,6 +50,9 @@ public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler<AppAutoDa
|
|||||||
protected void handleMessage(AppAutoDataMessage appAutoDataMessage) {
|
protected void handleMessage(AppAutoDataMessage appAutoDataMessage) {
|
||||||
Integer dataAttr = appAutoDataMessage.getMsg().getDataAttr();
|
Integer dataAttr = appAutoDataMessage.getMsg().getDataAttr();
|
||||||
switch (dataAttr) {
|
switch (dataAttr) {
|
||||||
|
case 0:
|
||||||
|
log.info("分发至事件处理");
|
||||||
|
break;
|
||||||
case 1:
|
case 1:
|
||||||
log.info("分发至实时数据");
|
log.info("分发至实时数据");
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -0,0 +1,146 @@
|
|||||||
|
package com.njcn.message.consumer;
|
||||||
|
|
||||||
|
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
|
||||||
|
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
|
||||||
|
import com.njcn.mq.constant.BusinessTopic;
|
||||||
|
import com.njcn.mq.constant.MessageStatus;
|
||||||
|
import com.njcn.mq.message.AppEventMessage;
|
||||||
|
import com.njcn.redis.pojo.enums.RedisKeyEnum;
|
||||||
|
import com.njcn.redis.utils.RedisUtil;
|
||||||
|
import com.njcn.system.api.RocketMqLogFeignClient;
|
||||||
|
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
|
||||||
|
import com.njcn.zlevent.api.EventFeignClient;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 类的介绍:
|
||||||
|
*
|
||||||
|
* @author xuyang
|
||||||
|
* @version 1.0.0
|
||||||
|
* @createTime 2023/8/11 15:32
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@RocketMQMessageListener(
|
||||||
|
topic = BusinessTopic.NJCN_APP_EVENT_TOPIC,
|
||||||
|
consumerGroup = BusinessTopic.NJCN_APP_EVENT_TOPIC,
|
||||||
|
selectorExpression = BusinessTopic.AppEventTag.EVENT_TAG,
|
||||||
|
consumeThreadNumber = 10,
|
||||||
|
enableMsgTrace = true
|
||||||
|
)
|
||||||
|
@Slf4j
|
||||||
|
public class AppEventConsumer extends EnhanceConsumerMessageHandler<AppEventMessage> implements RocketMQListener<AppEventMessage> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisUtil redisUtil;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RocketMqLogFeignClient rocketMqLogFeignClient;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private EventFeignClient eventFeignClient;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void handleMessage(AppEventMessage appEventMessage) {
|
||||||
|
Integer dataType = appEventMessage.getMsg().getDataType();
|
||||||
|
switch (dataType) {
|
||||||
|
case 6:
|
||||||
|
log.info("分发至暂态事件处理");
|
||||||
|
eventFeignClient.analysis(appEventMessage);
|
||||||
|
break;
|
||||||
|
case 16:
|
||||||
|
log.info("分发至录波处理");
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 通过redis分布式锁判断当前消息所处状态
|
||||||
|
* 1、null 查不到该key的数据,属于第一次消费,放行
|
||||||
|
* 2、fail 上次消息消费时发生异常,放行
|
||||||
|
* 3、being processed 正在处理,打回去
|
||||||
|
* 4、success 最近72小时消费成功,避免重复消费,打回去
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean filter(AppEventMessage message) {
|
||||||
|
String keyStatus = redisUtil.getStringByKey(message.getKey());
|
||||||
|
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
|
||||||
|
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.BEING_PROCESSED, 60L);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消费成功,缓存到redis72小时,避免重复消费
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void consumeSuccess(AppEventMessage message) {
|
||||||
|
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发生异常时,进行错误信息入库保存
|
||||||
|
* 默认没有实现类,子类可以实现该方法,调用feign接口进行入库保存
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void saveExceptionMsgLog(AppEventMessage message, String identity, Exception exception) {
|
||||||
|
redisUtil.saveByKeyWithExpire(message.getKey(), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
|
||||||
|
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
|
||||||
|
rocketmqMsgErrorLog.setMsgKey(message.getKey());
|
||||||
|
rocketmqMsgErrorLog.setResource(message.getSource());
|
||||||
|
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
|
||||||
|
//数据库字段配置长度200,避免插入失败,大致分析异常原因
|
||||||
|
String exceptionMsg = exception.getMessage();
|
||||||
|
if(exceptionMsg.length() > 200){
|
||||||
|
exceptionMsg = exceptionMsg.substring(0,180);
|
||||||
|
}
|
||||||
|
rocketmqMsgErrorLog.setRecord(exceptionMsg);
|
||||||
|
//如果是当前消息重试的则略过
|
||||||
|
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
|
||||||
|
//单次消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
|
||||||
|
//重试N次后,依然消费异常
|
||||||
|
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 处理失败后,是否重试
|
||||||
|
* 一般开启
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean isRetry() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 消费失败是否抛出异常,抛出异常后就不再消费了
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean throwException() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/***
|
||||||
|
* 调用父类handler处理消息的元信息
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onMessage(AppEventMessage appEventMessage) {
|
||||||
|
super.dispatchMessage(appEventMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
1
pom.xml
1
pom.xml
@@ -34,6 +34,7 @@
|
|||||||
<!--中间件目标地址-->
|
<!--中间件目标地址-->
|
||||||
<middle.server.url>192.168.1.13</middle.server.url>
|
<middle.server.url>192.168.1.13</middle.server.url>
|
||||||
<!--微服务模块发布地址-->
|
<!--微服务模块发布地址-->
|
||||||
|
<!-- <service.server.url>192.168.1.13</service.server.url>-->
|
||||||
<service.server.url>127.0.0.1</service.server.url>
|
<service.server.url>127.0.0.1</service.server.url>
|
||||||
<!--docker仓库地址-->
|
<!--docker仓库地址-->
|
||||||
<docker.server.url>192.168.1.13</docker.server.url>
|
<docker.server.url>192.168.1.13</docker.server.url>
|
||||||
|
|||||||
Reference in New Issue
Block a user