diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java index 353dce7..635390b 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/TypeEnum.java @@ -28,7 +28,7 @@ public enum TypeEnum { TYPE_12("8501","设备目录创建"), TYPE_13("8502","设备根目录查询"), TYPE_14("8705","设备心跳应答 "), - TYPE_15("9217","设备数据主动上送应答"), + TYPE_15("9218","设备数据主动上送应答"), TYPE_16("4609","设备支持主题应答"), TYPE_17("4610","联网装置应答注册请求"), TYPE_18("4611","设备模板信息应答 "), diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/AutoDataDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/AutoDataDto.java new file mode 100644 index 0000000..eb8c16d --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/AutoDataDto.java @@ -0,0 +1,77 @@ +package com.njcn.access.pojo.dto; + +import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.List; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:06 + */ +@Data +public class AutoDataDto { + + @SerializedName("Mid") + private Integer mid; + + @SerializedName("Did") + @ApiModelProperty("逻辑设备 治理逻辑设备为1 电能质量设备为2") + private Integer did; + + @SerializedName("Pri") + private Integer pri; + + @SerializedName("Type") + private Integer type; + + @SerializedName("Msg") + private Msg msg; + + @Data + public static class Msg{ + + @SerializedName("Cldid") + @ApiModelProperty("逻辑子设备 治理逻辑设备为0 电能质量设备为1、2") + private Integer clDid; + + @SerializedName("DataType") + private Integer dataType; + + @SerializedName("DataAttr") + @ApiModelProperty("数据属性:无-0、实时-1、统计-2") + private Integer dataAttr; + + @SerializedName("DsNameIdx") + private Integer dsNameIdx; + + @SerializedName("DataArray") + private List dataArray; + } + + @Data + public static class DataArray{ + + @SerializedName("DataAttr") + @ApiModelProperty("数据属性 -1-无 0-Rt(实时) 1-Max 2-Min 3-Avg 4-Cp95") + private Integer dataAttr; + + @SerializedName("DataTimeSec") + private Long dataTimeSec; + + @SerializedName("DataTimeUSec") + private Integer dataTimeUSec; + + @SerializedName("DataTag") + @ApiModelProperty("数据是否参与合格率统计") + private Integer dataTag; + + @SerializedName("Data") + private String data; + } + +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDataArrayPO.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDataArrayPO.java deleted file mode 100644 index b1efca7..0000000 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDataArrayPO.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.njcn.access.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import com.github.jeffreyning.mybatisplus.anno.MppMultiId; -import com.njcn.db.bo.BaseEntity; -import java.io.Serializable; - -import lombok.Data; -import lombok.Getter; -import lombok.Setter; - -/** - *

- * 详细数据表 - *

- * - * @author xuyang - * @since 2023-05-23 - */ -@Data -@TableName("cs_data_array") -public class CsDataArrayPO extends BaseEntity { - - private static final long serialVersionUID = 1L; - - /** - * id - */ - @TableId(value = "id") - private String id; - - /** - * 数据集表id(cs_data_set) - */ - private String pid; - - /** - * 数据字典表id(cs_pqd_epd、cs_md..) - */ - private String dataId; - - /** - * 数据名称 - */ - private String name; - - /** - * 数据别名 - */ - private String anotherName; - - /** - * 字典序号 - */ - private Integer idx; - - /** - * 排序(数据解析序号) - */ - private Integer sort; - - /** - * 数据统计方法(max、min、avg、cp95) - */ - private String statMethod; - - /** - * 数据类型(Float) - */ - private String dataType; - - /** - * 相别(A、B、C...) - */ - private String phase; - - /** - * influxdb表名 - */ - private String classId; - - -} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDataSetPO.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDataSetPO.java deleted file mode 100644 index db270c2..0000000 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDataSetPO.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.njcn.access.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import com.github.jeffreyning.mybatisplus.anno.MppMultiId; -import com.njcn.db.bo.BaseEntity; -import java.io.Serializable; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; - -/** - *

- * 数据集表 - *

- * - * @author xuyang - * @since 2023-05-23 - */ -@EqualsAndHashCode(callSuper = true) -@Data -@TableName("cs_data_set") -public class CsDataSetPO extends BaseEntity { - - private static final long serialVersionUID = 1L; - - /** - * Id - */ - @TableId(value = "id") - private String id; - - /** - * 装置数据模板表Id(cs_dev_model) - */ - private String pid; - - /** - * 数据集名称 - */ - private String name; - - /** - * 数据集别名 - */ - private String anotherName; - - /** - * 字典序号 - */ - private Integer idx; - - /** - * 数据类型(Rt:实时数据、Stat:统计数据) - */ - private String dataType; - - /** - * 周期 - */ - private Integer period; - - /** - * 是否存储 0:不存储 1:存储 - */ - private Integer storeFlag; - - /** - * 逻辑子设备id(从模板获取,主设备此字段没有值) - */ - private Integer clDev; - - /** - * 数据集类型(0:主设备 1:模块 2:监测设备) - */ - private Integer type; - - /** - * 数据模型 - */ - private String dataList; - - -} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDict.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDict.java deleted file mode 100644 index 6c08fa3..0000000 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDict.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.njcn.access.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableName; -import com.njcn.db.bo.BaseEntity; -import java.io.Serializable; -import java.time.LocalDateTime; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; - -/** - *

- * - *

- * - * @author xuyang - * @since 2023-05-18 - */ -@EqualsAndHashCode(callSuper = true) -@Data -@TableName("cs_dict") -public class CsDict extends BaseEntity { - - private static final long serialVersionUID = 1L; - - private String id; - - private String pid; - - private String name; - - private String anotherName; - - private Boolean status; - - private Integer sort; - - private String createBy; - - private LocalDateTime createTime; - - private String updateBy; - - private LocalDateTime updateTime; - - -} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsNetDevPO.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsNetDevPO.java deleted file mode 100644 index 344726e..0000000 --- a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsNetDevPO.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.njcn.access.pojo.po; - -import com.baomidou.mybatisplus.annotation.TableName; -import com.njcn.db.bo.BaseEntity; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; - -import java.time.LocalDate; - -/** - *

- * 联网设备表 - *

- * - * @author xuyang - * @since 2023-05-17 - */ -@Data -@EqualsAndHashCode(callSuper = true) -@TableName("cs_net_dev") -public class CsNetDevPO extends BaseEntity { - - private static final long serialVersionUID = 1L; - - /** - * id - */ - private String id; - - /** - * 装置型号 - */ - private String devType; - - private LocalDate time; - - /** - * 版本号 - */ - private String version; - - /** - * 系统软件表Id - */ - private String softInfoId; - - /** - * 工程配置表Id - */ - private String prjInfoId; - - /** - * 状态 - */ - private Integer status; - - -} diff --git a/iot-access/access-boot/pom.xml b/iot-access/access-boot/pom.xml index 91efac1..1749c49 100644 --- a/iot-access/access-boot/pom.xml +++ b/iot-access/access-boot/pom.xml @@ -58,6 +58,11 @@ org.springframework.kafka spring-kafka + + com.njcn + common-mq + ${project.version} + diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataArrayController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataArrayController.java index b390a31..710b9e2 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataArrayController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataArrayController.java @@ -1,15 +1,13 @@ package com.njcn.access.controller; -import com.njcn.access.pojo.po.CsDataArrayPO; -import com.njcn.access.pojo.po.CsDataSetPO; import com.njcn.access.service.ICsDataArrayService; -import com.njcn.access.service.ICsDataSetService; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; +import com.njcn.csdevice.pojo.po.CsDataArray; import com.njcn.web.controller.BaseController; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; @@ -46,7 +44,7 @@ public class CsDataArrayController extends BaseController { @PostMapping("/add") @ApiOperation("新增详细数据集") @ApiImplicitParam(name = "csDataArrayPo", value = "数据集", required = true) - public HttpResult add(@RequestBody CsDataArrayPO csDataArrayPo){ + public HttpResult add(@RequestBody CsDataArray csDataArrayPo){ String methodDescribe = getMethodDescribe("add"); csDataArrayService.add(csDataArrayPo); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); @@ -56,7 +54,7 @@ public class CsDataArrayController extends BaseController { @PostMapping("/addList") @ApiOperation("批量新增详细数据集") @ApiImplicitParam(name = "list", value = "数据集集合", required = true) - public HttpResult addList(@RequestBody List list){ + public HttpResult addList(@RequestBody List list){ String methodDescribe = getMethodDescribe("addList"); csDataArrayService.addList(list); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataSetController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataSetController.java index 83abba1..c8b0dfe 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataSetController.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDataSetController.java @@ -1,25 +1,23 @@ package com.njcn.access.controller; -import com.njcn.access.pojo.po.CsDataSetPO; import com.njcn.access.service.ICsDataSetService; -import com.njcn.access.service.ICsDevModelService; import com.njcn.common.pojo.annotation.OperateInfo; import com.njcn.common.pojo.enums.common.LogEnum; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.utils.HttpResultUtil; +import com.njcn.csdevice.pojo.po.CsDataSet; +import com.njcn.web.controller.BaseController; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.validation.annotation.Validated; -import org.springframework.web.bind.annotation.*; - -import com.njcn.web.controller.BaseController; -import org.springframework.web.multipart.MultipartFile; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; import springfox.documentation.annotations.ApiIgnore; import java.util.List; @@ -46,9 +44,9 @@ public class CsDataSetController extends BaseController { @PostMapping("/add") @ApiOperation("新增数据集") @ApiImplicitParam(name = "csDataSetPo", value = "数据集", required = true) - public HttpResult add(@RequestBody CsDataSetPO csDataSetPo){ + public HttpResult add(@RequestBody CsDataSet csDataSet){ String methodDescribe = getMethodDescribe("add"); - csDataSetService.add(csDataSetPo); + csDataSetService.add(csDataSet); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); } @@ -56,7 +54,7 @@ public class CsDataSetController extends BaseController { @PostMapping("/addList") @ApiOperation("批量新增数据集") @ApiImplicitParam(name = "list", value = "数据集集合", required = true) - public HttpResult addList(@RequestBody List list){ + public HttpResult addList(@RequestBody List list){ String methodDescribe = getMethodDescribe("addList"); csDataSetService.addList(list); return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index d23244f..6d71d6a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -2,6 +2,7 @@ package com.njcn.access.handler; import com.alibaba.excel.util.CollectionUtils; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.annotation.MqttSubscribe; import com.github.tocrhz.mqtt.annotation.NamedValue; @@ -20,13 +21,15 @@ import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsLineModelService; import com.njcn.access.service.ICsTopicService; import com.njcn.common.pojo.exception.BusinessException; -import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.DevModelFeignClient; import com.njcn.csdevice.pojo.po.CsDevModelPO; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.mq.template.AppAutoDataMessageTemplate; import com.njcn.redis.utils.RedisUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -60,6 +63,8 @@ public class MqttMessageHandler { private final ICsEquipmentDeliveryService csEquipmentDeliveryService; + private final AppAutoDataMessageTemplate appAutoDataMessageTemplate; + @Autowired Validator validator; @@ -232,7 +237,7 @@ public class MqttMessageHandler { /** - * 装置心跳 + * 装置心跳 && 主动数据上送 * @param topic * @param message * @param version @@ -260,7 +265,7 @@ public class MqttMessageHandler { reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_29.getCode())); reqAndResParam.setCode(200); reqAndResParam.setMsg(heartBeatDto); - publisher.send("/Dev/DataRsp/"+version+"/"+nDid,PubUtils.obj2json(reqAndResParam),1,false); + publisher.send("/Dev/DataRsp/"+version+"/"+nDid,gson.toJson(reqAndResParam),1,false); //处理业务逻辑 Object object = res.getMsg(); if (!Objects.isNull(object)){ @@ -276,11 +281,25 @@ public class MqttMessageHandler { break; case 4866: //处理主动上送数据 - + //todo 将消息发送给rocketMQ,判断消息类型,需要回复 + AutoDataDto dataDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), AutoDataDto.class); + JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(dataDto)); + AppAutoDataMessage appAutoDataMessage = JSONObject.toJavaObject(jsonObject, AppAutoDataMessage.class); + appAutoDataMessage.setId(nDid); + appAutoDataMessageTemplate.sendMember(appAutoDataMessage); + //mid大于0,则需要应答设备侧 + if (dataDto.getMid() > 0){ + ReqAndResDto.Res response = new ReqAndResDto.Res(); + response.setMid(dataDto.getMid()); + response.setDid(dataDto.getDid()); + response.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + response.setType(Integer.parseInt(TypeEnum.TYPE_15.getCode())); + response.setCode(200); + publisher.send("/Dev/DataRsp/"+version+"/"+nDid,new Gson().toJson(response),1,false); + } break; default: break; } } - } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataArrayMapper.java b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataArrayMapper.java index c8d9204..da72d90 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataArrayMapper.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataArrayMapper.java @@ -1,7 +1,7 @@ package com.njcn.access.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.njcn.access.pojo.po.CsDataArrayPO; +import com.njcn.csdevice.pojo.po.CsDataArray; /** *

@@ -11,6 +11,6 @@ import com.njcn.access.pojo.po.CsDataArrayPO; * @author xuyang * @since 2023-08-01 */ -public interface CsDataArrayMapper extends BaseMapper { +public interface CsDataArrayMapper extends BaseMapper { } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataSetMapper.java b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataSetMapper.java index d8bbc32..5e053bb 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataSetMapper.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDataSetMapper.java @@ -1,7 +1,7 @@ package com.njcn.access.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.njcn.access.pojo.po.CsDataSetPO; +import com.njcn.csdevice.pojo.po.CsDataSet; /** *

@@ -11,6 +11,6 @@ import com.njcn.access.pojo.po.CsDataSetPO; * @author xuyang * @since 2023-08-01 */ -public interface CsDataSetMapper extends BaseMapper { +public interface CsDataSetMapper extends BaseMapper { } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataArrayService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataArrayService.java index be5ec34..c76bec6 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataArrayService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataArrayService.java @@ -1,8 +1,7 @@ package com.njcn.access.service; import com.baomidou.mybatisplus.extension.service.IService; -import com.njcn.access.pojo.po.CsDataArrayPO; -import com.njcn.access.pojo.po.CsDataSetPO; +import com.njcn.csdevice.pojo.po.CsDataArray; import java.util.List; @@ -14,17 +13,17 @@ import java.util.List; * @author xuyang * @since 2023-08-01 */ -public interface ICsDataArrayService extends IService { +public interface ICsDataArrayService extends IService { /** * 新增详细数据集 * @param csDataArrayPo */ - void add(CsDataArrayPO csDataArrayPo); + void add(CsDataArray csDataArrayPo); /** * 批量新增数据集 * @param list */ - void addList(List list); + void addList(List list); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataSetService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataSetService.java index 2654f5e..614e2e5 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataSetService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDataSetService.java @@ -1,7 +1,7 @@ package com.njcn.access.service; import com.baomidou.mybatisplus.extension.service.IService; -import com.njcn.access.pojo.po.CsDataSetPO; +import com.njcn.csdevice.pojo.po.CsDataSet; import java.util.List; @@ -13,17 +13,17 @@ import java.util.List; * @author xuyang * @since 2023-08-01 */ -public interface ICsDataSetService extends IService { +public interface ICsDataSetService extends IService { /** * 新增数据集 * @param csDataSetPo */ - void add(CsDataSetPO csDataSetPo); + void add(CsDataSet csDataSetPo); /** * 批量新增数据集 * @param list */ - void addList(List list); + void addList(List list); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataArrayServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataArrayServiceImpl.java index c31fea9..eea126e 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataArrayServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataArrayServiceImpl.java @@ -2,8 +2,8 @@ package com.njcn.access.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.access.mapper.CsDataArrayMapper; -import com.njcn.access.pojo.po.CsDataArrayPO; import com.njcn.access.service.ICsDataArrayService; +import com.njcn.csdevice.pojo.po.CsDataArray; import org.springframework.stereotype.Service; import java.util.List; @@ -17,16 +17,16 @@ import java.util.List; * @since 2023-08-01 */ @Service -public class CsDataArrayServiceImpl extends ServiceImpl implements ICsDataArrayService { +public class CsDataArrayServiceImpl extends ServiceImpl implements ICsDataArrayService { @Override - public void add(CsDataArrayPO csDataArrayPo) { + public void add(CsDataArray csDataArrayPo) { this.save(csDataArrayPo); } @Override - public void addList(List list) { + public void addList(List list) { this.saveBatch(list,1000); } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java index 134b11a..487d0be 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDataSetServiceImpl.java @@ -2,8 +2,8 @@ package com.njcn.access.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.access.mapper.CsDataSetMapper; -import com.njcn.access.pojo.po.CsDataSetPO; import com.njcn.access.service.ICsDataSetService; +import com.njcn.csdevice.pojo.po.CsDataSet; import org.springframework.stereotype.Service; import java.util.List; @@ -17,15 +17,15 @@ import java.util.List; * @since 2023-08-01 */ @Service -public class CsDataSetServiceImpl extends ServiceImpl implements ICsDataSetService { +public class CsDataSetServiceImpl extends ServiceImpl implements ICsDataSetService { @Override - public void add(CsDataSetPO csDataSetPo) { + public void add(CsDataSet csDataSetPo) { this.save(csDataSetPo); } @Override - public void addList(List list) { + public void addList(List list) { this.saveBatch(list,1000); } } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java index f45f5b7..7cab046 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDevModelServiceImpl.java @@ -8,16 +8,12 @@ import com.njcn.access.enums.DataModel; import com.njcn.access.mapper.CsDevModelMapper; import com.njcn.access.pojo.dto.data.*; import com.njcn.access.pojo.dto.devModel.*; -import com.njcn.access.pojo.po.CsDataArrayPO; -import com.njcn.access.pojo.po.CsDataSetPO; import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.service.*; import com.njcn.access.utils.JsonUtil; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.api.DevModelFeignClient; -import com.njcn.csdevice.pojo.po.CsDevModelPO; -import com.njcn.csdevice.pojo.po.CsGroArr; -import com.njcn.csdevice.pojo.po.CsGroup; +import com.njcn.csdevice.pojo.po.*; import com.njcn.oss.constant.OssPath; import com.njcn.oss.utils.FileStorageUtil; import com.njcn.system.api.DicDataFeignClient; @@ -89,7 +85,6 @@ public class CsDevModelServiceImpl implements ICsDevModelService { analysisDataSet(templateDto,csDevModelPo.getId()); //3.录入监测点模板表(记录当前模板有几个监测点,治理类型的模板目前规定1个监测点,电能质量模板根据逻辑子设备来) addCsLineModel(templateDto,csDevModelPo.getId()); - //todo 这边软件信息和监测点信息需要接入后询问设备,然后记录 } catch (IOException e) { e.printStackTrace(); } @@ -550,31 +545,31 @@ public class CsDevModelServiceImpl implements ICsDevModelService { * 解析数据集、详细数据 */ private void analysisDataSet(TemplateDto templateDto,String pId){ - List setList = new ArrayList<>(); - List arrayList = new ArrayList<>(); + List setList = new ArrayList<>(); + List arrayList = new ArrayList<>(); List dataSetList = templateDto.getDataSet(); //逻辑设备录入 if (CollectionUtil.isNotEmpty(dataSetList)){ dataSetList.forEach(item1->{ String id = IdUtil.fastSimpleUUID(); - CsDataSetPO csDataSetPo = new CsDataSetPO(); - csDataSetPo.setId(id); - csDataSetPo.setPid(pId); - csDataSetPo.setName(item1.getName()); - csDataSetPo.setAnotherName(dataSetName(item1.getName())); - csDataSetPo.setIdx(item1.getIdx()); - csDataSetPo.setPeriod(item1.getPeriod()); - csDataSetPo.setStoreFlag(item1.getStoreFlag()); - csDataSetPo.setDataList(String.join(",",templateDto.getDataList())); - csDataSetPo.setType(0); - csDataSetPo.setClDev(0); - setList.add(csDataSetPo); + CsDataSet CsDataSet = new CsDataSet(); + CsDataSet.setId(id); + CsDataSet.setPid(pId); + CsDataSet.setName(item1.getName()); + CsDataSet.setAnotherName(dataSetName(item1.getName())); + CsDataSet.setIdx(item1.getIdx()); + CsDataSet.setPeriod(item1.getPeriod()); + CsDataSet.setStoreFlag(item1.getStoreFlag()); + CsDataSet.setDataList(String.join(",",templateDto.getDataList())); + CsDataSet.setType(0); + CsDataSet.setClDev(0); + setList.add(CsDataSet); List list = item1.getDataArrayDtoList(); if(CollectionUtil.isNotEmpty(list)) { int i = 0; for (DataArrayDto item2 : list) { - List po = findDict(id,templateDto,item2.getType(),item2.getIdx()); - for (CsDataArrayPO p : po) { + List po = findDict(id,templateDto,item2.getType(),item2.getIdx()); + for (CsDataArray p : po) { p.setIdx(item2.getIdx()); p.setSort(i); i++; @@ -592,30 +587,30 @@ public class CsDevModelServiceImpl implements ICsDevModelService { if (CollectionUtil.isNotEmpty(clDataSetList)){ clDataSetList.forEach(item4->{ String id = IdUtil.fastSimpleUUID(); - CsDataSetPO csDataSetPo = new CsDataSetPO(); - csDataSetPo.setId(id); - csDataSetPo.setPid(pId); - csDataSetPo.setName(item4.getName()); - csDataSetPo.setAnotherName(dataSetName(item4.getName())); - csDataSetPo.setIdx(item4.getIdx()); - csDataSetPo.setPeriod(item4.getPeriod()); - csDataSetPo.setStoreFlag(item4.getStoreFlag()); - csDataSetPo.setDataList(String.join(",",item3.getDataList())); + CsDataSet CsDataSet = new CsDataSet(); + CsDataSet.setId(id); + CsDataSet.setPid(pId); + CsDataSet.setName(item4.getName()); + CsDataSet.setAnotherName(dataSetName(item4.getName())); + CsDataSet.setIdx(item4.getIdx()); + CsDataSet.setPeriod(item4.getPeriod()); + CsDataSet.setStoreFlag(item4.getStoreFlag()); + CsDataSet.setDataList(String.join(",",item3.getDataList())); //fixme 先用数据类型来区分模板的类型 if (item3.getDataList().contains("Apf") || item3.getDataList().contains("Dvr")){ - csDataSetPo.setType(1); + CsDataSet.setType(1); } else { - csDataSetPo.setType(2); + CsDataSet.setType(2); } - csDataSetPo.setClDev(item3.getClDid()); - setList.add(csDataSetPo); + CsDataSet.setClDev(item3.getClDid()); + setList.add(CsDataSet); List list = item4.getDataArrayDtoList(); if(CollectionUtil.isNotEmpty(list)) { int i = 0; for (DataArrayDto item2 : list) { - List po = findDict(id,templateDto,item2.getType(),item2.getIdx()); - for (CsDataArrayPO p : po) { + List po = findDict(id,templateDto,item2.getType(),item2.getIdx()); + for (CsDataArray p : po) { p.setIdx(item2.getIdx()); p.setSort(i); i++; @@ -634,10 +629,10 @@ public class CsDevModelServiceImpl implements ICsDevModelService { csDataArrayService.addList(arrayList); List ls = new ArrayList<>(); List groArrList = new ArrayList<>(); - Map> setMap = arrayList.stream().collect(Collectors.groupingBy(CsDataArrayPO::getPid,LinkedHashMap::new,Collectors.toList())); + Map> setMap = arrayList.stream().collect(Collectors.groupingBy(CsDataArray::getPid,LinkedHashMap::new,Collectors.toList())); setMap.forEach((k0,v0)->{ AtomicReference sort = new AtomicReference<>(0); - Map> map = v0.stream().filter(a-> "avg".equals(a.getStatMethod()) || Objects.isNull(a.getStatMethod())).collect(Collectors.groupingBy(CsDataArrayPO::getAnotherName,LinkedHashMap::new,Collectors.toList())); + Map> map = v0.stream().filter(a-> "avg".equals(a.getStatMethod()) || Objects.isNull(a.getStatMethod())).collect(Collectors.groupingBy(CsDataArray::getAnotherName,LinkedHashMap::new,Collectors.toList())); map.forEach((k,v)->{ //录入组数据 String groupId = IdUtil.simpleUUID(); @@ -672,8 +667,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService { * @param type * @param idx */ - private List findDict(String pid, TemplateDto templateDto, String type, Integer idx) { - List list = new ArrayList<>(); + private List findDict(String pid, TemplateDto templateDto, String type, Integer idx) { + List list = new ArrayList<>(); String id = dicDataFeignClient.getDicDataByCode(type).getData().getId(); String name = null,phase = null; switch (type) { @@ -783,26 +778,26 @@ public class CsDevModelServiceImpl implements ICsDevModelService { if (!Objects.isNull(eleEpdPqd.getStatMethod())){ String[] statMethodList = eleEpdPqd.getStatMethod().split(","); for (String stat : statMethodList) { - CsDataArrayPO csDataArrayPo = new CsDataArrayPO(); - csDataArrayPo.setPid(pid); - csDataArrayPo.setDataId(eleEpdPqd.getId()); - csDataArrayPo.setName(eleEpdPqd.getName() + "_" + i); - csDataArrayPo.setAnotherName((i-0.5) + "次" +eleEpdPqd.getShowName()); - csDataArrayPo.setStatMethod(stat); - csDataArrayPo.setDataType(eleEpdPqd.getDataType()); - csDataArrayPo.setPhase(eleEpdPqd.getPhase()); - list.add(csDataArrayPo); + CsDataArray CsDataArray = new CsDataArray(); + CsDataArray.setPid(pid); + CsDataArray.setDataId(eleEpdPqd.getId()); + CsDataArray.setName(eleEpdPqd.getName() + "_" + i); + CsDataArray.setAnotherName((i-0.5) + "次" +eleEpdPqd.getShowName()); + CsDataArray.setStatMethod(stat); + CsDataArray.setDataType(eleEpdPqd.getType()); + CsDataArray.setPhase(eleEpdPqd.getPhase()); + list.add(CsDataArray); } } else { - CsDataArrayPO csDataArrayPo = new CsDataArrayPO(); - csDataArrayPo.setPid(pid); - csDataArrayPo.setDataId(eleEpdPqd.getId()); - csDataArrayPo.setName(eleEpdPqd.getName() + "_" + i); - csDataArrayPo.setAnotherName((i-0.5) + "次" +eleEpdPqd.getShowName()); - csDataArrayPo.setStatMethod("M"); - csDataArrayPo.setDataType(eleEpdPqd.getDataType()); - csDataArrayPo.setPhase(eleEpdPqd.getPhase()); - list.add(csDataArrayPo); + CsDataArray CsDataArray = new CsDataArray(); + CsDataArray.setPid(pid); + CsDataArray.setDataId(eleEpdPqd.getId()); + CsDataArray.setName(eleEpdPqd.getName() + "_" + i); + CsDataArray.setAnotherName((i-0.5) + "次" +eleEpdPqd.getShowName()); + CsDataArray.setStatMethod("M"); + CsDataArray.setDataType(eleEpdPqd.getType()); + CsDataArray.setPhase(eleEpdPqd.getPhase()); + list.add(CsDataArray); } } } else { @@ -810,26 +805,26 @@ public class CsDevModelServiceImpl implements ICsDevModelService { if (!Objects.isNull(eleEpdPqd.getStatMethod())){ String[] statMethodList = eleEpdPqd.getStatMethod().split(","); for (String stat : statMethodList) { - CsDataArrayPO csDataArrayPo = new CsDataArrayPO(); - csDataArrayPo.setPid(pid); - csDataArrayPo.setDataId(eleEpdPqd.getId()); - csDataArrayPo.setName(eleEpdPqd.getName() + "_" + i); - csDataArrayPo.setAnotherName(i + "次" +eleEpdPqd.getShowName()); - csDataArrayPo.setStatMethod(stat); - csDataArrayPo.setDataType(eleEpdPqd.getDataType()); - csDataArrayPo.setPhase(eleEpdPqd.getPhase()); - list.add(csDataArrayPo); + CsDataArray CsDataArray = new CsDataArray(); + CsDataArray.setPid(pid); + CsDataArray.setDataId(eleEpdPqd.getId()); + CsDataArray.setName(eleEpdPqd.getName() + "_" + i); + CsDataArray.setAnotherName(i + "次" +eleEpdPqd.getShowName()); + CsDataArray.setStatMethod(stat); + CsDataArray.setDataType(eleEpdPqd.getType()); + CsDataArray.setPhase(eleEpdPqd.getPhase()); + list.add(CsDataArray); } } else { - CsDataArrayPO csDataArrayPo = new CsDataArrayPO(); - csDataArrayPo.setPid(pid); - csDataArrayPo.setDataId(eleEpdPqd.getId()); - csDataArrayPo.setName(eleEpdPqd.getName() + "_" + i); - csDataArrayPo.setAnotherName(i + "次" +eleEpdPqd.getShowName()); - csDataArrayPo.setStatMethod("M"); - csDataArrayPo.setDataType(eleEpdPqd.getDataType()); - csDataArrayPo.setPhase(eleEpdPqd.getPhase()); - list.add(csDataArrayPo); + CsDataArray CsDataArray = new CsDataArray(); + CsDataArray.setPid(pid); + CsDataArray.setDataId(eleEpdPqd.getId()); + CsDataArray.setName(eleEpdPqd.getName() + "_" + i); + CsDataArray.setAnotherName(i + "次" +eleEpdPqd.getShowName()); + CsDataArray.setStatMethod("M"); + CsDataArray.setDataType(eleEpdPqd.getType()); + CsDataArray.setPhase(eleEpdPqd.getPhase()); + list.add(CsDataArray); } } } @@ -837,26 +832,26 @@ public class CsDevModelServiceImpl implements ICsDevModelService { if (!Objects.isNull(eleEpdPqd.getStatMethod())){ List statMethodList = Arrays.asList(eleEpdPqd.getStatMethod().split(",")); statMethodList.forEach(stat->{ - CsDataArrayPO csDataArrayPo = new CsDataArrayPO(); - csDataArrayPo.setPid(pid); - csDataArrayPo.setDataId(eleEpdPqd.getId()); - csDataArrayPo.setName(eleEpdPqd.getName()); - csDataArrayPo.setAnotherName(eleEpdPqd.getShowName()); - csDataArrayPo.setStatMethod(stat); - csDataArrayPo.setDataType(eleEpdPqd.getDataType()); - csDataArrayPo.setPhase(eleEpdPqd.getPhase()); - list.add(csDataArrayPo); + CsDataArray CsDataArray = new CsDataArray(); + CsDataArray.setPid(pid); + CsDataArray.setDataId(eleEpdPqd.getId()); + CsDataArray.setName(eleEpdPqd.getName()); + CsDataArray.setAnotherName(eleEpdPqd.getShowName()); + CsDataArray.setStatMethod(stat); + CsDataArray.setDataType(eleEpdPqd.getType()); + CsDataArray.setPhase(eleEpdPqd.getPhase()); + list.add(CsDataArray); }); } else { - CsDataArrayPO csDataArrayPo = new CsDataArrayPO(); - csDataArrayPo.setPid(pid); - csDataArrayPo.setDataId(eleEpdPqd.getId()); - csDataArrayPo.setName(eleEpdPqd.getName()); - csDataArrayPo.setAnotherName(eleEpdPqd.getShowName()); - csDataArrayPo.setStatMethod("M"); - csDataArrayPo.setDataType(eleEpdPqd.getDataType()); - csDataArrayPo.setPhase(eleEpdPqd.getPhase()); - list.add(csDataArrayPo); + CsDataArray CsDataArray = new CsDataArray(); + CsDataArray.setPid(pid); + CsDataArray.setDataId(eleEpdPqd.getId()); + CsDataArray.setName(eleEpdPqd.getName()); + CsDataArray.setAnotherName(eleEpdPqd.getShowName()); + CsDataArray.setStatMethod("M"); + CsDataArray.setDataType(eleEpdPqd.getType()); + CsDataArray.setPhase(eleEpdPqd.getPhase()); + list.add(CsDataArray); } } return list; diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index 08467d8..f30342a 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -3,6 +3,7 @@ package com.njcn.access.service.impl; import cn.hutool.core.util.IdUtil; import com.alibaba.excel.util.CollectionUtils; import com.alibaba.fastjson.JSON; +import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessResponseEnum; @@ -265,7 +266,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_5.getCode())); reqAndResParam.setExpire(-1); - publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, PubUtils.obj2json(reqAndResParam),1,false); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); } /** @@ -278,7 +279,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_1.getCode())); reqAndResParam.setExpire(-1); - publisher.send("/Pfm/DevTopic/"+nDid, PubUtils.obj2json(reqAndResParam),1,false); + publisher.send("/Pfm/DevTopic/"+nDid, new Gson().toJson(reqAndResParam),1,false); } /** @@ -297,7 +298,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { accessDto.setNDid(nDid); accessDto.setDevType(devType); reqAndResParam.setMsg(accessDto); - publisher.send("/Pfm/DevReg/"+nDid, PubUtils.obj2json(reqAndResParam),1,false); + publisher.send("/Pfm/DevReg/"+nDid, new Gson().toJson(reqAndResParam),1,false); } public List objectToList(Object object) { @@ -347,7 +348,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService { askDataDto.setDataType(2); } reqAndResParam.setMsg(askDataDto); - publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, PubUtils.obj2json(reqAndResParam),1,false); + publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(reqAndResParam),1,false); } } diff --git a/iot-access/access-boot/src/test/java/com/njcn/AppTest.java b/iot-access/access-boot/src/test/java/com/njcn/AppTest.java index 1783903..fef8b4d 100644 --- a/iot-access/access-boot/src/test/java/com/njcn/AppTest.java +++ b/iot-access/access-boot/src/test/java/com/njcn/AppTest.java @@ -4,7 +4,12 @@ import static org.junit.Assert.assertTrue; import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.graphbuilder.math.func.EFunction; +import com.njcn.access.AccessBootApplication; +import com.njcn.access.enums.AccessEnum; +import com.njcn.access.enums.TypeEnum; +import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.mqtt.MqttClientDto; import io.lettuce.core.protocol.CompleteableCommand; import okhttp3.Credentials; @@ -12,7 +17,12 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.context.web.WebAppConfiguration; +import javax.annotation.Resource; import java.io.BufferedReader; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -26,6 +36,9 @@ import java.util.stream.Collectors; /** * Unit test for simple App. */ +@RunWith(SpringRunner.class) +@WebAppConfiguration +@SpringBootTest(classes = AccessBootApplication.class) public class AppTest { /** @@ -37,30 +50,43 @@ public class AppTest assertTrue( true ); } + @Resource + private MqttPublisher publisher; - public static void main(String[] args) { - try { - String username = "ac760c62395cecec"; - String password = "k0vGfe5xOE2Bl4DCF73uahcknvcwoKOEDPnOkMvuSBB"; - OkHttpClient client = new OkHttpClient(); - - Request request = new Request.Builder() - .url("http://192.168.1.18:18083/api/v5/clients/access-boot123456") - .header("Content-Type", "application/json") - .header("Authorization", Credentials.basic(username, password)) - .build(); - - Response response = client.newCall(request).execute(); - response.body(); - String res = Objects.requireNonNull(response.body()).string(); - - Gson gson = new Gson(); - MqttClientDto mqttClientDto = gson.fromJson(res, new TypeToken(){}.getType()); - System.out.println("mqttClientDto==:" + mqttClientDto.isConnected()); - - } catch (IOException e) { - e.printStackTrace(); - } + @Test + public void test() { + ReqAndResDto reqAndResParam = new ReqAndResDto(); + reqAndResParam.setMid(1); + reqAndResParam.setDid(0); + reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); + reqAndResParam.setType(4866); + publisher.send("/Dev/Data1/V1/123", new Gson().toJson(reqAndResParam),1,false); } + +// public static void main(String[] args) { +// try { +// String username = "ac760c62395cecec"; +// String password = "k0vGfe5xOE2Bl4DCF73uahcknvcwoKOEDPnOkMvuSBB"; +// +// OkHttpClient client = new OkHttpClient(); +// +// Request request = new Request.Builder() +// .url("http://192.168.1.18:18083/api/v5/clients/access-boot123456") +// .header("Content-Type", "application/json") +// .header("Authorization", Credentials.basic(username, password)) +// .build(); +// +// Response response = client.newCall(request).execute(); +// response.body(); +// String res = Objects.requireNonNull(response.body()).string(); +// +// Gson gson = new Gson(); +// MqttClientDto mqttClientDto = gson.fromJson(res, new TypeToken(){}.getType()); +// System.out.println("mqttClientDto==:" + mqttClientDto.isConnected()); +// +// } catch (IOException e) { +// e.printStackTrace(); +// } +// } } diff --git a/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/controller/RtController.java b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/controller/RtController.java new file mode 100644 index 0000000..dfdae56 --- /dev/null +++ b/iot-analysis/analysis-rt/rt-boot/src/main/java/com/njcn/rt/controller/RtController.java @@ -0,0 +1,46 @@ +package com.njcn.rt.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/14 9:20 + */ +@Slf4j +@RestController +@RequestMapping("/rtData") +@Api(tags = "实时数据") +@AllArgsConstructor +public class RtController extends BaseController { + +// @OperateInfo(info = LogEnum.BUSINESS_COMMON) +// @PostMapping("/analysis") +// @ApiOperation("数据解析") +// @ApiImplicitParam(name = "csDataEffectiveAddParm", value = "新增app数据有效性表参数", required = true) +// public HttpResult addDataEffective(@RequestBody @Validated CsDataEffectiveAddParm csDataEffectiveAddParm){ +// String methodDescribe = getMethodDescribe("addDataEffective"); +// +// boolean save = csDataEffectiveService.add (csDataEffectiveAddParm); +// return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, save, methodDescribe); +// } + + +} diff --git a/iot-analysis/analysis-stat/stat-api/pom.xml b/iot-analysis/analysis-stat/stat-api/pom.xml index 169420b..ea1b37c 100644 --- a/iot-analysis/analysis-stat/stat-api/pom.xml +++ b/iot-analysis/analysis-stat/stat-api/pom.xml @@ -11,8 +11,34 @@ stat-api 1.0.0 + + + com.njcn + common-core + ${project.version} + + + com.njcn + common-db + ${project.version} + + + org.projectlombok + lombok + + + com.njcn + common-microservice + ${project.version} + + + com.njcn + common-mq + ${project.version} + + - stat-api + stat-api UTF-8 diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/RtFeignClient.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/RtFeignClient.java new file mode 100644 index 0000000..c275ece --- /dev/null +++ b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/RtFeignClient.java @@ -0,0 +1,19 @@ +package com.njcn.stat.api; + +import com.njcn.common.pojo.constant.ServerInfo; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.stat.api.fallback.RtClientFallbackFactory; +import io.swagger.annotations.ApiOperation; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; + +/** + * @author xy + */ +@FeignClient(value = ServerInfo.CS_STAT_BOOT, path = "/stat", fallbackFactory = RtClientFallbackFactory.class,contextId = "stat") +public interface RtFeignClient { + + @PostMapping("/analysis") + HttpResult analysis(AppAutoDataMessage appAutoDataMessage); +} diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/RtClientFallbackFactory.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/RtClientFallbackFactory.java new file mode 100644 index 0000000..9e1e4af --- /dev/null +++ b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/api/fallback/RtClientFallbackFactory.java @@ -0,0 +1,35 @@ +package com.njcn.stat.api.fallback; + +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.stat.api.RtFeignClient; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author xy + */ +@Slf4j +@Component +public class RtClientFallbackFactory implements FallbackFactory { + @Override + public RtFeignClient create(Throwable cause) { + //判断抛出异常是否为解码器抛出的业务异常 + Enum exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; + if (cause.getCause() instanceof BusinessException) { + BusinessException businessException = (BusinessException) cause.getCause(); + } + Enum finalExceptionEnum = exceptionEnum; + return new RtFeignClient() { + + @Override + public HttpResult analysis(AppAutoDataMessage appAutoDataMessage) { + log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString()); + throw new BusinessException(finalExceptionEnum); + } + }; + } +} diff --git a/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/enums/StatResponseEnum.java b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/enums/StatResponseEnum.java new file mode 100644 index 0000000..b86c968 --- /dev/null +++ b/iot-analysis/analysis-stat/stat-api/src/main/java/com/njcn/stat/enums/StatResponseEnum.java @@ -0,0 +1,37 @@ +package com.njcn.stat.enums; + +import lombok.Getter; + +/** + * @author xuyang + * @version 1.0.0 + * @date 2023年04月17日 10:50 + */ +@Getter +public enum StatResponseEnum { + + /** + * A1001 ~ A1099 用于用户模块的枚举 + *

+ */ + STAT_ERROR("A10001","统计数据模块错误"), + + DATA_ARRAY_NULL("A10002","详细数据为空"), + AUTO_DATA_NULL("A10002","上送数据为空"), + DICT_NULL("A10002","字典数据为空"), + LINE_NULL("A10002","监测点为空"), + + ARRAY_DATA_NOT_MATCH("A10003","上送数据与模板匹配失败"), + + ; + + private final String code; + + private final String message; + + StatResponseEnum(String code, String message) { + this.code = code; + this.message = message; + } + +} diff --git a/iot-analysis/analysis-stat/stat-boot/pom.xml b/iot-analysis/analysis-stat/stat-boot/pom.xml index fa5f9b6..d204c56 100644 --- a/iot-analysis/analysis-stat/stat-boot/pom.xml +++ b/iot-analysis/analysis-stat/stat-boot/pom.xml @@ -40,6 +40,42 @@ common-db ${project.version} + + com.njcn + access-api + ${project.version} + + + com.njcn + common-mq + ${project.version} + + + com.njcn + cs-device-api + ${project.version} + + + com.njcn + stat-api + ${project.version} + + + com.njcn + system-api + ${project.version} + + + com.njcn + pqs-influx + 0.0.1-SNAPSHOT + + + com.njcn + common-influxDB + ${project.version} + compile + diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/controller/StatController.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/controller/StatController.java new file mode 100644 index 0000000..b99ae4e --- /dev/null +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/controller/StatController.java @@ -0,0 +1,48 @@ +package com.njcn.stat.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.stat.service.IStatService; +import com.njcn.web.controller.BaseController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/14 9:23 + */ +@Slf4j +@RestController +@RequestMapping("/stat") +@Api(tags = "统计数据") +@AllArgsConstructor +public class StatController extends BaseController { + + private final IStatService statService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/analysis") + @ApiOperation("数据解析") + @ApiImplicitParam(name = "appAutoDataMessage", value = "数据实体", required = true) + public HttpResult analysis(@RequestBody @Validated AppAutoDataMessage appAutoDataMessage){ + String methodDescribe = getMethodDescribe("analysis"); + statService.analysis(appAutoDataMessage); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe); + } + +} diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/handler/MqttMessageHandler.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/handler/MqttMessageHandler.java new file mode 100644 index 0000000..b1d587d --- /dev/null +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/handler/MqttMessageHandler.java @@ -0,0 +1,63 @@ +package com.njcn.stat.handler; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.github.tocrhz.mqtt.annotation.MqttSubscribe; +import com.github.tocrhz.mqtt.annotation.NamedValue; +import com.github.tocrhz.mqtt.annotation.Payload; +import com.github.tocrhz.mqtt.publisher.MqttPublisher; +import com.njcn.access.pojo.dto.ReqAndResDto; +import com.njcn.redis.utils.RedisUtil; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import javax.validation.Validator; +import java.nio.charset.StandardCharsets; + +/** + * @author hongawen + * @version 1.0.0 + * @date 2022年03月23日 09:41 + */ +@Slf4j +@Component +@AllArgsConstructor +public class MqttMessageHandler { + + private final MqttPublisher publisher; + + private final RedisUtil redisUtil; + + @Autowired + Validator validator; + + + /** + * 装置心跳 + * @param topic + * @param message + * @param version + * @param nDid + * @param payload + */ + @MqttSubscribe(value = "/Dev/Data/{version}/{edgeId}",qos = 1) + @Transactional(rollbackFor = Exception.class) + public void devHeartBeat(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) { + //解析数据 + Gson gson = new Gson(); + ReqAndResDto.Req res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Req.class); + //响应请求 + switch (res.getType()){ + case 4866: + //处理主动上送数据 + + break; + default: + break; + } + } + +} diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java new file mode 100644 index 0000000..57b7e92 --- /dev/null +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/IStatService.java @@ -0,0 +1,16 @@ +package com.njcn.stat.service; + +import com.njcn.mq.message.AppAutoDataMessage; + +/** + * @author xy + */ +public interface IStatService { + + /** + * 解析统计数据 + * @param appAutoDataMessage + */ + void analysis(AppAutoDataMessage appAutoDataMessage); + +} diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java new file mode 100644 index 0000000..c6cc409 --- /dev/null +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -0,0 +1,177 @@ +package com.njcn.stat.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.access.enums.AccessResponseEnum; +import com.njcn.common.pojo.exception.BusinessException; +import com.njcn.common.utils.PubUtils; +import com.njcn.csdevice.api.CsLineFeignClient; +import com.njcn.csdevice.api.DataArrayFeignClient; +import com.njcn.csdevice.pojo.param.DataArrayParam; +import com.njcn.csdevice.pojo.po.CsDataArray; +import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.influx.pojo.constant.InfluxDBTableConstant; +import com.njcn.influxdb.utils.InfluxDbUtils; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.redis.utils.RedisUtil; +import com.njcn.stat.enums.StatResponseEnum; +import com.njcn.stat.service.IStatService; +import com.njcn.system.api.DicDataFeignClient; +import com.njcn.system.api.EpdFeignClient; +import com.njcn.system.enums.DicDataEnum; +import com.njcn.system.enums.SystemResponseEnum; +import com.njcn.system.pojo.dto.EpdDTO; +import com.njcn.system.pojo.po.Dic; +import com.njcn.system.pojo.po.DictData; +import com.njcn.system.pojo.po.EleEpdPqd; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.net.Inet4Address; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/14 9:32 + */ +@Service +@Slf4j +@AllArgsConstructor +public class StatServiceImpl implements IStatService { + + private final DataArrayFeignClient dataArrayFeignClient; + + private final EpdFeignClient epdFeignClient; + + private final DicDataFeignClient dicDataFeignClient; + + private final InfluxDbUtils influxDbUtils; + + private final CsLineFeignClient csLineFeignClient; + + private final RedisUtil redisUtil; + + @Override + @Transactional(rollbackFor = Exception.class) + public void analysis(AppAutoDataMessage appAutoDataMessage) { + //1.根据设备网络识别码获取设备id,查询到所用的模板,用来判断模板的类型(治理模板还是电能质量模板) + //2.解析appAutoDataMessage的Did,来判断当前数据是治理数据还是电能质量数据 + //3-1.治理数据则获取治理的dataArray,并且查询治理的监测点 + //3-2.电能质量数据则还需要判断Cldid的值,在dataSet里面筛选正确的数据集,查询dataArray(默认Cldid=1为负载侧监测点、Cldid=2为电网侧监测点) + //4.查询dataArray数据,查询对应的字典数据,然后将数据组装,录入influxDB + DataArrayParam dataArrayParam = new DataArrayParam(); + dataArrayParam.setId(appAutoDataMessage.getId()); + dataArrayParam.setDid(appAutoDataMessage.getDid()); + dataArrayParam.setCldId(appAutoDataMessage.getMsg().getClDid()); + List list = appAutoDataMessage.getMsg().getDataArray(); + //获取监测点id + String lineId = lineInfo(appAutoDataMessage.getId(),appAutoDataMessage.getMsg().getClDid()); + //缓存指标和influxDB表关系 + saveData(); + if (CollectionUtil.isNotEmpty(list)){ + list.forEach(item->{ + switch (item.getDataAttr()) { + case 1: + log.info("处理最大值"); + dataArrayParam.setStatMethod("max"); + break; + case 2: + log.info("处理最小值"); + dataArrayParam.setStatMethod("min"); + break; + case 3: + log.info("处理avg"); + dataArrayParam.setStatMethod("avg"); + break; + case 4: + log.info("处理cp95"); + dataArrayParam.setStatMethod("cp95"); + break; + default: + break; + } + insertData(lineId,dataArrayParam,item); + }); + } + } + + /** + * 获取监测点相关信息 + */ + public String lineInfo(String id, Integer clDid) { + Map map = new HashMap<>(); + List lineList = csLineFeignClient.findByNdid(id).getData(); + if (CollectionUtil.isEmpty(lineList)){ + throw new BusinessException(StatResponseEnum.LINE_NULL); + } + lineList.forEach(item->{ + DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData(); + if (Objects.isNull(dictData)){ + throw new BusinessException(StatResponseEnum.DICT_NULL); + } + if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){ + map.put(0,item.getLineId()); + } else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){ + map.put(1,item.getLineId()); + } else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){ + map.put(2,item.getLineId()); + } + }); + return map.get(clDid); + } + + /** + * 缓存字典和influxDB表关系 + */ + public void saveData() { + Map map = new HashMap<>(); + List list = epdFeignClient.findAll().getData(); + if (CollectionUtil.isEmpty(list)){ + throw new BusinessException(StatResponseEnum.DICT_NULL); + } + list.forEach(item->{ + map.put(item.getDictName(),item.getTableName()); + }); + redisUtil.saveByKeyWithExpire("ELEEPDPQD",map,180L); + } + + + /** + * influxDB数据入库 + */ + public void insertData(String lineId,DataArrayParam dataArrayParam,AppAutoDataMessage.DataArray item) { + //获取详细数据 + List dataArrayList = dataArrayFeignClient.findListByParam(dataArrayParam).getData(); + if (CollectionUtil.isEmpty(dataArrayList)){ + throw new BusinessException(StatResponseEnum.DATA_ARRAY_NULL); + } + //解码 + List floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData())); + if (CollectionUtil.isEmpty(floats)){ + throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL); + } + //校验模板和解码数据数量能否对应上 + if (!Objects.equals(dataArrayList.size(),floats.size())){ + throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH); + } + for (int i = 0; i < dataArrayList.size(); i++) { + String tableName = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey("ELEEPDPQD")), Map.class).get(dataArrayList.get(i).getName()).toString(); + Map tags = new HashMap<>(); + tags.put(InfluxDBTableConstant.LINE_ID,lineId); + tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase()); + tags.put(InfluxDBTableConstant.VALUE_TYPE,dataArrayList.get(i).getStatMethod()); + Map fields = new HashMap<>(); + fields.put(dataArrayList.get(i).getName(),floats.get(i)); + fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag()); + influxDbUtils.insert(tableName, tags, fields, item.getDataTimeSec(), TimeUnit.SECONDS); + } + } + +} diff --git a/iot-message/message-boot/pom.xml b/iot-message/message-boot/pom.xml index 9fa2c18..e2fd02b 100644 --- a/iot-message/message-boot/pom.xml +++ b/iot-message/message-boot/pom.xml @@ -51,6 +51,16 @@ 1.0.0 compile + + com.njcn + common-mq + ${project.version} + + + com.njcn + stat-api + ${project.version} + diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java b/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java index ae998e9..2a8a37e 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/MessageBootApplication.java @@ -13,7 +13,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; * @date 2021年12月09日 20:59 */ @Slf4j -@MapperScan("com.njcn.**.mapper") +//@MapperScan("com.njcn.**.mapper") @EnableFeignClients(basePackages = "com.njcn") @SpringBootApplication(scanBasePackages = "com.njcn") public class MessageBootApplication { diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java new file mode 100644 index 0000000..e14d486 --- /dev/null +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppAutoDataConsumer.java @@ -0,0 +1,68 @@ +package com.njcn.message.consumer; + +import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler; +import com.njcn.mq.constant.BusinessTopic; +import com.njcn.mq.message.AppAutoDataMessage; +import com.njcn.stat.api.RtFeignClient; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/8/11 15:32 + */ +@Service +@RocketMQMessageListener( + topic = BusinessTopic.NJCJ_APP_AUTO_DATA_TOPIC, + consumerGroup = BusinessTopic.NJCJ_APP_AUTO_DATA_TOPIC, + enableMsgTrace = true +) +@Slf4j +public class AppAutoDataConsumer extends EnhanceConsumerMessageHandler implements RocketMQListener { + + @Resource + private RtFeignClient rtFeignClient; + + @Override + protected void handleMessage(AppAutoDataMessage appAutoDataMessage) { + Integer dataAttr = appAutoDataMessage.getMsg().getDataAttr(); + switch (dataAttr) { + case 1: + log.info("分发至实时数据"); + break; + case 2: + log.info("分发至统计数据"); + rtFeignClient.analysis(appAutoDataMessage); + break; + default: + break; + } + } + + @Override + protected void handleMaxRetriesExceeded(AppAutoDataMessage appAutoDataMessage) { + + } + + @Override + protected boolean isRetry() { + return false; + } + + @Override + protected boolean throwException() { + return false; + } + + @Override + public void onMessage(AppAutoDataMessage appAutoDataMessage) { + super.dispatchMessage(appAutoDataMessage); + } +} diff --git a/iot-message/message-boot/src/main/resources/bootstrap.yml b/iot-message/message-boot/src/main/resources/bootstrap.yml index 29f3ac9..b0feb12 100644 --- a/iot-message/message-boot/src/main/resources/bootstrap.yml +++ b/iot-message/message-boot/src/main/resources/bootstrap.yml @@ -6,7 +6,7 @@ microservice: sentinel: url: @sentinel.url@ gateway: - url: + url: @gateway.url@ server: port: 10302 #feign接口开启服务熔断降级处理 @@ -31,7 +31,7 @@ spring: shared-configs: - data-id: share-config.yaml refresh: true - - data-Id: share-config-datasource-db.yaml + - data-Id: algorithm-config.yaml refresh: true main: allow-bean-definition-overriding: true diff --git a/pom.xml b/pom.xml index 4dc4c24..5d8a24a 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ 192.168.1.13 - 192.168.1.139 + 127.0.0.1 192.168.1.13