diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java
index a05bb4a..03d0dc9 100644
--- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java
+++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/EventDto.java
@@ -82,6 +82,79 @@ public class EventDto {
@SerializedName("Parm")
private List param;
+
+
+ @SerializedName("PrjName")
+ @ApiModelProperty("工程名称")
+ private String prjName;
+
+ @SerializedName("PrjTimeStart")
+ @ApiModelProperty("装置启动时间")
+ private Long prjTimeStart;
+
+ @SerializedName("PrjTimeEnd")
+ @ApiModelProperty("装置结束时间")
+ private Long prjTimeEnd;
+
+ @SerializedName("PrjDataPath")
+ @ApiModelProperty("装置数据路径")
+ private String prjDataPath;
+
+ @SerializedName("DevType")
+ @ApiModelProperty("装置型号")
+ private String devType;
+
+ @SerializedName("DevMac")
+ @ApiModelProperty("装置mac地址")
+ private String devMac;
+
+ @SerializedName("AppVersion")
+ @ApiModelProperty("装置程序版本")
+ private String appVersion;
+
+ @SerializedName("Cldid")
+ @ApiModelProperty("逻辑子设备id")
+ private Integer clDid;
+
+ @SerializedName("StatCycle")
+ @ApiModelProperty("统计间隔")
+ private Integer statCycle;
+
+ @SerializedName("VolGrade")
+ @ApiModelProperty("电压等级")
+ private Float volGrade;
+
+ @SerializedName("VolConType")
+ @ApiModelProperty("电压接线方式(0-星型, 1-角型, 2-V型)")
+ private Integer volConType;
+
+ @SerializedName("CurConSel")
+ @ApiModelProperty("电流接线方式(0-正常, 1-合成IB, 2-合成IC)")
+ private Integer curConSel;
+
+ @SerializedName("PtRatio")
+ @ApiModelProperty("PT变比")
+ private Integer ptRatio;
+
+ @SerializedName("CtRatio")
+ @ApiModelProperty("ct变比")
+ private Integer ctRatio;
+
+ @SerializedName("CapacitySscb")
+ @ApiModelProperty("基准短路容量")
+ private Float capacitySscb;
+
+ @SerializedName("CapacitySscmin")
+ @ApiModelProperty("最小短路容量")
+ private Float capacitySscmin;
+
+ @SerializedName("CapacitySt")
+ @ApiModelProperty("供电设备容量")
+ private Float capacitySt;
+
+ @SerializedName("CapacitySi")
+ @ApiModelProperty("用户协议容量")
+ private Float capacitySi;
}
@Data
diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java
index 057be14..393b187 100644
--- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java
+++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java
@@ -105,6 +105,16 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
executeMainTask(scheduler,nDid,version);
}
+ //自动接入
+ else if (expiredKey.startsWith("autoAccess")) {
+ List list = csEquipmentDeliveryService.getAll();
+ list.forEach(item->{
+ String version = csTopicService.getVersion(item.getNdid());
+ if (!Objects.isNull(version)){
+ csDeviceService.devAccessAskTemplate(item.getNdid(),version,1);
+ }
+ });
+ }
}
//主任务
diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java
index fd2a1f9..ea756a2 100644
--- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java
+++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AccessApplicationRunner.java
@@ -1,17 +1,12 @@
package com.njcn.access.runner;
-import com.njcn.access.service.ICsEquipmentDeliveryService;
-import com.njcn.access.service.ICsTopicService;
-import com.njcn.access.service.impl.CsDeviceServiceImpl;
-import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
+import com.njcn.redis.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
-import java.util.List;
-import java.util.Objects;
/**
* 类的介绍:用来重新发起设备的接入,存在程序意外停止了,缓存失效导致无法更新装置的状态,所以需要在程序启动时发起设备的接入
@@ -25,21 +20,11 @@ import java.util.Objects;
public class AccessApplicationRunner implements ApplicationRunner {
@Resource
- private CsDeviceServiceImpl csDeviceService;
- @Resource
- private ICsTopicService csTopicService;
- @Resource
- private ICsEquipmentDeliveryService csEquipmentDeliveryService;
+ private RedisUtil redisUtil;
@Override
- public void run(ApplicationArguments args){
- List list = csEquipmentDeliveryService.getAll();
- list.forEach(item->{
- String version = csTopicService.getVersion(item.getNdid());
- if (!Objects.isNull(version)){
- csDeviceService.devAccessAskTemplate(item.getNdid(),version,1);
- }
- });
+ public void run(ApplicationArguments args) {
+ redisUtil.saveByKeyWithExpire("autoAccess",null,60L);
}
}
diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java
index b4b57bb..3c278be 100644
--- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java
+++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java
@@ -42,6 +42,7 @@ import com.njcn.system.pojo.vo.DictTreeVO;
import com.njcn.web.utils.RequestUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
@@ -64,49 +65,26 @@ import java.util.stream.Collectors;
public class CsDevModelServiceImpl implements ICsDevModelService {
private final FileStorageUtil fileStorageUtil;
-
private final DevModelFeignClient devModelFeignClient;
-
private final EpdFeignClient epdFeignClient;
-
private final DicDataFeignClient dicDataFeignClient;
-
private final EleEvtFeignClient eleEvtFeignClient;
-
private final ICsDataSetService csDataSetService;
-
private final ICsDataArrayService csDataArrayService;
-
private final CsDevModelMapper csDevModelMapper;
-
private final ICsLineModelService csLineModelService;
-
private final ICsGroupService csGroupService;
-
private final ICsGroArrService csGroArrService;
-
private final CsLogsFeignClient csLogsFeignClient;
-
private final EleWaveFeignClient waveFeignClient;
-
private final DictTreeFeignClient dictTreeFeignClient;
-
private final MqttPublisher publisher;
-
private final ICsTopicService csTopicService;
-
- private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
-
private final RedisUtil redisUtil;
- private final MqttMessageHandler mqttMessageHandler;
-
@Override
public void refreshDevModelCache() {
-
-
-
}
@Override
diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/WlRecordFeignClient.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/WlRecordFeignClient.java
deleted file mode 100644
index b58111b..0000000
--- a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/WlRecordFeignClient.java
+++ /dev/null
@@ -1,21 +0,0 @@
-//package com.njcn.stat.api;
-//
-//import com.njcn.common.pojo.constant.ServerInfo;
-//import com.njcn.common.pojo.response.HttpResult;
-//import com.njcn.mq.message.AppAutoDataMessage;
-//import com.njcn.stat.api.fallback.WlRecordClientFallbackFactory;
-//import org.springframework.cloud.openfeign.FeignClient;
-//import org.springframework.validation.annotation.Validated;
-//import org.springframework.web.bind.annotation.PostMapping;
-//import org.springframework.web.bind.annotation.RequestBody;
-//
-///**
-// * @author xy
-// */
-//@FeignClient(value = ServerInfo.CS_STAT_BOOT, path = "/record", fallbackFactory = WlRecordClientFallbackFactory.class,contextId = "record")
-//public interface WlRecordFeignClient {
-//
-// @PostMapping("/addOrUpdateBaseData")
-// HttpResult addOrUpdateBaseData(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage);
-//
-//}
diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/WlRecordClientFallbackFactory.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/WlRecordClientFallbackFactory.java
deleted file mode 100644
index 60ae9bf..0000000
--- a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/WlRecordClientFallbackFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-//package com.njcn.stat.api.fallback;
-//
-//import com.njcn.common.pojo.enums.response.CommonResponseEnum;
-//import com.njcn.common.pojo.exception.BusinessException;
-//import com.njcn.common.pojo.response.HttpResult;
-//import com.njcn.mq.message.AppAutoDataMessage;
-//import com.njcn.stat.api.WlRecordFeignClient;
-//import feign.hystrix.FallbackFactory;
-//import lombok.extern.slf4j.Slf4j;
-//import org.springframework.stereotype.Component;
-//
-///**
-// * @author xy
-// */
-//@Slf4j
-//@Component
-//public class WlRecordClientFallbackFactory implements FallbackFactory {
-// @Override
-// public WlRecordFeignClient create(Throwable cause) {
-// //判断抛出异常是否为解码器抛出的业务异常
-// Enum> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
-// if (cause.getCause() instanceof BusinessException) {
-// BusinessException businessException = (BusinessException) cause.getCause();
-// }
-// Enum> finalExceptionEnum = exceptionEnum;
-// return new WlRecordFeignClient() {
-//
-// @Override
-// public HttpResult addOrUpdateBaseData(AppAutoDataMessage appAutoDataMessage) {
-// log.error("{}异常,降级处理,异常为:{}","新增或更新装置基础数据",cause.toString());
-// throw new BusinessException(finalExceptionEnum);
-// }
-// };
-// }
-//}
diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java
index c9d02b9..bfea268 100644
--- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java
+++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/EventFeignClient.java
@@ -6,6 +6,7 @@ import com.njcn.mq.message.AppEventMessage;
import com.njcn.zlevent.api.fallback.EventClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
/**
* @author xy
@@ -16,7 +17,7 @@ public interface EventFeignClient {
@PostMapping("/analysis")
HttpResult analysis(AppEventMessage appEventMessage);
- @PostMapping("/errorEvent")
- HttpResult insertErrorEvent(AppEventMessage appEventMessage);
+ @PostMapping("/portableData")
+ HttpResult getPortableData(@RequestBody AppEventMessage appEventMessage);
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java
index 495882d..d2555aa 100644
--- a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java
+++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/api/fallback/EventClientFallbackFactory.java
@@ -32,10 +32,11 @@ public class EventClientFallbackFactory implements FallbackFactory insertErrorEvent(AppEventMessage appEventMessage) {
- log.error("{}异常,降级处理,异常为:{}","异常事件统计",cause.toString());
+ public HttpResult getPortableData(AppEventMessage appEventMessage) {
+ log.error("{}异常,降级处理,异常为:{}","便携式设备数据记录动作",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
+
};
}
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java
index 4f99edf..4a82214 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/EventController.java
@@ -28,7 +28,7 @@ import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/event")
-@Api(tags = "暂态事件处理")
+@Api(tags = "事件处理")
@AllArgsConstructor
public class EventController extends BaseController {
@@ -44,4 +44,14 @@ public class EventController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
+ @OperateInfo(info = LogEnum.BUSINESS_COMMON)
+ @PostMapping("/portableData")
+ @ApiOperation("便携式数据事件")
+ @ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true)
+ public HttpResult getPortableData(@RequestBody AppEventMessage appEventMessage){
+ String methodDescribe = getMethodDescribe("getPortableData");
+ eventService.getPortableData(appEventMessage);
+ return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
+ }
+
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java
index 59a27e0..fdb93cc 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/IEventService.java
@@ -17,4 +17,12 @@ public interface IEventService {
*/
void analysis(AppEventMessage appEventMessage);
+ /**
+ * 便携式设备基础数据
+ * 1.装置发起数据记录开始动作,库中新增数据;
+ * 2.装置发起数据记录结束动作,库中更新数据;
+ * @param appEventMessage
+ */
+ void getPortableData(AppEventMessage appEventMessage);
+
}
diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
index 869d48d..b180760 100644
--- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
+++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java
@@ -6,8 +6,12 @@ import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
+import com.njcn.csdevice.api.WlRecordFeignClient;
+import com.njcn.csdevice.pojo.param.WlRecordParam;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.csdevice.pojo.po.CsLinePO;
+import com.njcn.csdevice.pojo.po.WlRecord;
+import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.csharmonic.pojo.po.CsEventPO;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.utils.InfluxDbUtils;
@@ -31,11 +35,15 @@ import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
+import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import java.math.BigDecimal;
import java.text.SimpleDateFormat;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -53,22 +61,15 @@ import java.util.concurrent.TimeUnit;
public class EventServiceImpl implements IEventService {
private final CsLineFeignClient csLineFeignClient;
-
private final DicDataFeignClient dicDataFeignClient;
-
private final EpdFeignClient epdFeignClient;
-
private final RedisUtil redisUtil;
-
private final ICsEventService csEventService;
-
private final EquipmentFeignClient equipmentFeignClient;
-
private final InfluxDbUtils influxDbUtils;
-
private final ICsEventLogsService csEventLogsService;
-
private final SendEventUtils sendEventUtils;
+ private final WlRecordFeignClient wlRecordFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -188,6 +189,93 @@ public class EventServiceImpl implements IEventService {
}
}
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public void getPortableData(AppEventMessage appEventMessage) {
+ CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(appEventMessage.getId()).getData();
+ //根据报文判断是新增还是更新
+ List dataArrays = appEventMessage.getMsg().getDataArray();
+ dataArrays.forEach(item->{
+ WlRecordParam.Record param = new WlRecordParam.Record();
+ param.setDevId(vo.getId());
+ param.setLineId(appEventMessage.getId() + item.getClDid().toString());
+ param.setProName(item.getPrjName());
+ param.setProStartTime(timestampToDatetime((item.getPrjTimeStart() - 8*3600)));
+ WlRecord record = wlRecordFeignClient.findDevBaseData(param).getData();
+ if (!Objects.isNull(record)) {
+ if (!Objects.equals(item.getPrjTimeEnd(),-1L)) {
+ WlRecordParam.UpdateRecord wlRecord = new WlRecordParam.UpdateRecord();
+ wlRecord.setId(record.getId());
+ wlRecord.setProEndTime(timestampToDatetime((item.getPrjTimeEnd() - 8*3600)));
+ wlRecordFeignClient.updateTestRecord(wlRecord);
+ }
+ } else {
+ //新项目入库
+ WlRecord wlRecord = new WlRecord();
+ wlRecord.setId(IdUtil.simpleUUID());
+ wlRecord.setItemName("基础数据");
+ wlRecord.setGcName(item.getPrjName());
+ wlRecord.setDevId(vo.getId());
+ wlRecord.setLineId(appEventMessage.getId() + item.getClDid().toString());
+ wlRecord.setStatisticalInterval(item.getStatCycle());
+ wlRecord.setPt(item.getPtRatio());
+ wlRecord.setCt(item.getCtRatio());
+ //电压等级
+ DictData dictData = dicDataFeignClient.getDicDataByCodeAndType(channelVol(item.getVolGrade()) + "kV","Dev_Voltage_Stand").getData();
+ wlRecord.setVoltageLevel(Objects.isNull(dictData)?null:dictData.getId());
+ wlRecord.setCapacitySscb(item.getCapacitySscb());
+ wlRecord.setCapacitySscmin(item.getCapacitySscmin());
+ wlRecord.setCapacitySt(item.getCapacitySt());
+ wlRecord.setCapacitySi(item.getCapacitySi());
+ //电压接线方式
+ wlRecord.setVolConType(getVolConType(item.getVolConType()));
+ //fixme 电流接线方式 这边系统没有字典,录入字典通用性不强,采用装置上送值存储
+ wlRecord.setCurConSel(item.getCurConSel().toString());
+ wlRecord.setStartTime(timestampToDatetime((item.getPrjTimeStart() - 8*3600)));
+ wlRecord.setType(1);
+ wlRecord.setState(1);
+ wlRecord.setGcDataPath(item.getPrjDataPath());
+ wlRecordFeignClient.addBaseData(wlRecord);
+ }
+ });
+ }
+
+ /**
+ * 处理电压
+ * @param vol
+ * @return
+ */
+ public String channelVol(Float vol) {
+ BigDecimal value = new BigDecimal(vol);
+ BigDecimal noZeros = value.stripTrailingZeros();
+ return noZeros.toPlainString();
+ }
+
+ // 0-星型, 1-角型, 2-V型
+ // star-星型、Star_Triangle-星三角、Open_Delta-开口三角
+ public String getVolConType(Integer volConType) {
+ String result = null;
+ String dictDataCode = null;
+ switch (volConType) {
+ case 0:
+ dictDataCode = "star";
+ break;
+ case 1:
+ dictDataCode = "Star_Triangle";
+ break;
+ case 2:
+ dictDataCode = "Open_Delta";
+ break;
+ default:
+ break;
+ }
+ if (!Objects.isNull(dictDataCode)) {
+ DictData dictData = dicDataFeignClient.getDicDataByCodeAndType(dictDataCode,"Dev_Connect").getData();
+ result = dictData.getId();
+ }
+ return Objects.isNull(result)?null:result;
+ }
+
/**
* 缓存监测点相关信息
*/
@@ -248,4 +336,9 @@ public class EventServiceImpl implements IEventService {
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
return LocalDateTime.parse(time, fmt);
}
+
+ public LocalDateTime timestampToDatetime(long timestamp){
+ Instant instant = Instant.ofEpochSecond(timestamp);
+ return LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+ }
}
diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java
index 3f26345..0a0b96f 100644
--- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java
+++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java
@@ -70,6 +70,7 @@ public class AppEventConsumer extends EnhanceConsumerMessageHandler