From f91670786f6db9e4599e06c899d5acb7c64fecea Mon Sep 17 00:00:00 2001 From: xuyang <748613696@qq.com> Date: Fri, 8 Sep 2023 16:04:31 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=B2=BB=E7=90=86=E6=9A=82=E6=80=81=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E3=80=81=E6=B3=A2=E5=BD=A2=E6=96=87=E4=BB=B6=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E8=A7=A3=E6=9E=90=E5=8A=9F=E8=83=BD;=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=AE=B0=E5=BD=95=E5=8A=9F=E8=83=BD=202.=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E4=B8=8A=E7=BA=BF=E3=80=81=E6=8E=89=E7=BA=BF=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/pojo/po/CsDeviceOnlineLogs.java | 47 +++++ .../CsDeviceOnlineLogsController.java | 54 ++++++ .../access/handler/MqttMessageHandler.java | 20 +++ .../listener/RedisKeyExpirationListener.java | 10 ++ .../mapper/CsDeviceOnlineLogsMapper.java | 16 ++ .../service/ICsDeviceOnlineLogsService.java | 23 +++ .../impl/CsDeviceOnlineLogsServiceImpl.java | 25 +++ .../com/njcn/zlevent/pojo/po/CsEventLogs.java | 67 ++++++++ .../controller/CsEventLogsController.java | 30 ++++ .../zlevent/mapper/CsEventLogsMapper.java | 17 ++ .../zlevent/service/ICsEventLogsService.java | 16 ++ .../service/impl/CsEventLogsServiceImpl.java | 20 +++ .../service/impl/CsEventServiceImpl.java | 1 - .../service/impl/CsWaveServiceImpl.java | 1 - .../service/impl/EventServiceImpl.java | 1 + .../zlevent/service/impl/FileServiceImpl.java | 160 ++++++++++-------- .../njcn/zlevent/utils/FileCheckUtils.java | 59 +++++++ .../message/consumer/AppEventConsumer.java | 3 + .../consumer/AppFileStreamConsumer.java | 2 +- 19 files changed, 503 insertions(+), 69 deletions(-) create mode 100644 iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDeviceOnlineLogs.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceOnlineLogsController.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDeviceOnlineLogsMapper.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceOnlineLogsService.java create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceOnlineLogsServiceImpl.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsEventLogsController.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java create mode 100644 iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/FileCheckUtils.java diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDeviceOnlineLogs.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDeviceOnlineLogs.java new file mode 100644 index 0000000..6d09e97 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/po/CsDeviceOnlineLogs.java @@ -0,0 +1,47 @@ +package com.njcn.access.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.db.bo.BaseEntity; +import java.io.Serializable; +import java.time.LocalDateTime; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +/** + *

+ * 设备状态日志表,记录设备掉线上线的情况 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +@Data +@TableName("cs_device_online_logs") +public class CsDeviceOnlineLogs { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + private String id; + + /** + * 设备识别码 + */ + private String ndid; + + /** + * 掉线时间 + */ + private LocalDateTime offlineTime; + + /** + * 上线时间 + */ + private LocalDateTime onlineTime; + + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceOnlineLogsController.java b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceOnlineLogsController.java new file mode 100644 index 0000000..1dc2041 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceOnlineLogsController.java @@ -0,0 +1,54 @@ +package com.njcn.access.controller; + + +import com.njcn.access.pojo.po.CsDeviceOnlineLogs; +import com.njcn.access.service.ICsDevModelService; +import com.njcn.access.service.ICsDeviceOnlineLogsService; +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.common.utils.HttpResultUtil; +import com.njcn.web.annotation.ReturnMsg; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import com.njcn.web.controller.BaseController; +import org.springframework.web.multipart.MultipartFile; + +/** + *

+ * 设备状态日志表,记录设备掉线上线的情况 前端控制器 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +@Slf4j +@RestController +@RequestMapping("/csDeviceOnlineLogs") +@Api(tags = "设备上线日志表") +@AllArgsConstructor +public class CsDeviceOnlineLogsController extends BaseController { + + private final ICsDeviceOnlineLogsService csDeviceOnlineLogsService; + + @OperateInfo(info = LogEnum.BUSINESS_COMMON) + @PostMapping("/find") + @ApiOperation("find") + @ApiImplicitParam(name = "nDid", value = "设备识别码", required = true) + public HttpResult devRegister(@RequestParam String nDid){ + CsDeviceOnlineLogs vo = csDeviceOnlineLogsService.findLastData(nDid); + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, vo, "123"); + } + + + +} + diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index e9fb3ec..4401e19 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -16,8 +16,10 @@ import com.njcn.access.pojo.dto.*; import com.njcn.access.pojo.dto.file.FileDto; import com.njcn.access.pojo.dto.heart.HeartBeatDto; import com.njcn.access.pojo.param.ReqAndResParam; +import com.njcn.access.pojo.po.CsDeviceOnlineLogs; import com.njcn.access.pojo.po.CsLineModel; import com.njcn.access.pojo.po.CsTopic; +import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsLineModelService; import com.njcn.access.service.ICsTopicService; @@ -50,6 +52,7 @@ import javax.validation.ConstraintViolation; import javax.validation.Validator; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @@ -87,6 +90,8 @@ public class MqttMessageHandler { private final AppFileStreamMessageTemplate appFileStreamMessageTemplate; + private final ICsDeviceOnlineLogsService onlineLogsService; + @Autowired Validator validator; @@ -309,6 +314,21 @@ public class MqttMessageHandler { //修改装置状态 csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode()); csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode()); + //记录设备上线 + CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); + CsDeviceOnlineLogs csDeviceOnlineLogs = new CsDeviceOnlineLogs(); + if(Objects.isNull(record)) { + csDeviceOnlineLogs.setNdid(nDid); + csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now()); + onlineLogsService.save(csDeviceOnlineLogs); + } else { + LocalDateTime time = record.getOfflineTime(); + if (!Objects.isNull(time)){ + csDeviceOnlineLogs.setNdid(nDid); + csDeviceOnlineLogs.setOnlineTime(LocalDateTime.now()); + onlineLogsService.save(csDeviceOnlineLogs); + } + } } else { log.info(AccessResponseEnum.ACCESS_RESPONSE_ERROR.getMessage()); logDto.setResult(0); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index 4d0d0ca..bab4140 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -1,6 +1,8 @@ package com.njcn.access.listener; import com.njcn.access.enums.AccessEnum; +import com.njcn.access.pojo.po.CsDeviceOnlineLogs; +import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; @@ -17,6 +19,7 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.time.LocalDateTime; import java.util.Objects; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -43,6 +46,9 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene @Resource private CsLogsFeignClient csLogsFeignClient; + @Resource + private ICsDeviceOnlineLogsService onlineLogsService; + public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @@ -74,6 +80,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode()); logDto.setOperate("装置掉线,装置为:" + nDid); csLogsFeignClient.addUserLog(logDto); + //记录装置下线日志 + CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid); + record.setOfflineTime(LocalDateTime.now()); + onlineLogsService.updateById(record); //立马发起接入请求 String version = csTopicService.getVersion(nDid); log.info("装置掉线立马发送接入请求,接入失败则进入定时接入任务"); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDeviceOnlineLogsMapper.java b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDeviceOnlineLogsMapper.java new file mode 100644 index 0000000..e36e511 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/mapper/CsDeviceOnlineLogsMapper.java @@ -0,0 +1,16 @@ +package com.njcn.access.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.access.pojo.po.CsDeviceOnlineLogs; + +/** + *

+ * 设备状态日志表,记录设备掉线上线的情况 Mapper 接口 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +public interface CsDeviceOnlineLogsMapper extends BaseMapper { + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceOnlineLogsService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceOnlineLogsService.java new file mode 100644 index 0000000..fc430a2 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsDeviceOnlineLogsService.java @@ -0,0 +1,23 @@ +package com.njcn.access.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.access.pojo.po.CsDeviceOnlineLogs; + +/** + *

+ * 设备状态日志表,记录设备掉线上线的情况 服务类 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +public interface ICsDeviceOnlineLogsService extends IService { + + /** + * 根据nDid查询最新的一条记录 + * @param nDid + * @return + */ + CsDeviceOnlineLogs findLastData(String nDid); + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceOnlineLogsServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceOnlineLogsServiceImpl.java new file mode 100644 index 0000000..9f54040 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceOnlineLogsServiceImpl.java @@ -0,0 +1,25 @@ +package com.njcn.access.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.access.mapper.CsDeviceOnlineLogsMapper; +import com.njcn.access.pojo.po.CsDeviceOnlineLogs; +import com.njcn.access.service.ICsDeviceOnlineLogsService; +import org.springframework.stereotype.Service; + +/** + *

+ * 设备状态日志表,记录设备掉线上线的情况 服务实现类 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +@Service +public class CsDeviceOnlineLogsServiceImpl extends ServiceImpl implements ICsDeviceOnlineLogsService { + + @Override + public CsDeviceOnlineLogs findLastData(String nDid) { + return this.lambdaQuery().eq(CsDeviceOnlineLogs::getNdid,nDid).orderByDesc(CsDeviceOnlineLogs::getOnlineTime).last("LIMIT 1").one(); + } +} diff --git a/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java new file mode 100644 index 0000000..224ca40 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-api/src/main/java/com/njcn/zlevent/pojo/po/CsEventLogs.java @@ -0,0 +1,67 @@ +package com.njcn.zlevent.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.njcn.db.bo.BaseEntity; +import java.io.Serializable; +import java.time.LocalDateTime; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +/** + *

+ * 治理暂态文件解析日志 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +@Data +@TableName("cs_event_logs") +public class CsEventLogs { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + private String id; + + /** + * 设备识别码 + */ + private String ndid; + + /** + * 文件名称(全路径) + */ + private String fileName; + + /** + * 解析状态(0:失败 1:成功) + */ + private Integer status; + + /** + * 备注 + */ + private String remark; + + /** + * 解析完成时间 + */ + private LocalDateTime completeTime; + + /** + * 波形文件起始时间 + */ + private LocalDateTime startTime; + + /** + * 波形结束时间 + */ + private LocalDateTime endTime; + + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsEventLogsController.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsEventLogsController.java new file mode 100644 index 0000000..b52d6d2 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/controller/CsEventLogsController.java @@ -0,0 +1,30 @@ +package com.njcn.zlevent.controller; + + +import io.swagger.annotations.Api; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.RequestMapping; + +import org.springframework.web.bind.annotation.RestController; +import com.njcn.web.controller.BaseController; +import springfox.documentation.annotations.ApiIgnore; + +/** + *

+ * 治理暂态文件解析日志 前端控制器 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +@Slf4j +@RestController +@RequestMapping("/csEventLogs") +@Api(tags = "暂态文件日志处理") +@AllArgsConstructor +@ApiIgnore +public class CsEventLogsController extends BaseController { + +} + diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java new file mode 100644 index 0000000..8bcf399 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/mapper/CsEventLogsMapper.java @@ -0,0 +1,17 @@ +package com.njcn.zlevent.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.njcn.zlevent.pojo.po.CsEventLogs; + +/** + *

+ * 治理暂态文件解析日志 Mapper 接口 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +public interface CsEventLogsMapper extends BaseMapper { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java new file mode 100644 index 0000000..cf057f9 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/ICsEventLogsService.java @@ -0,0 +1,16 @@ +package com.njcn.zlevent.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.njcn.zlevent.pojo.po.CsEventLogs; + +/** + *

+ * 治理暂态文件解析日志 服务类 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +public interface ICsEventLogsService extends IService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java new file mode 100644 index 0000000..22cecf8 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventLogsServiceImpl.java @@ -0,0 +1,20 @@ +package com.njcn.zlevent.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.zlevent.mapper.CsEventLogsMapper; +import com.njcn.zlevent.pojo.po.CsEventLogs; +import com.njcn.zlevent.service.ICsEventLogsService; +import org.springframework.stereotype.Service; + +/** + *

+ * 治理暂态文件解析日志 服务实现类 + *

+ * + * @author xuyang + * @since 2023-09-08 + */ +@Service +public class CsEventLogsServiceImpl extends ServiceImpl implements ICsEventLogsService { + +} diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java index b5fe78c..22991e7 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsEventServiceImpl.java @@ -1,6 +1,5 @@ package com.njcn.zlevent.service.impl; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.njcn.csharmonic.pojo.po.CsEventPO; diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java index 5d60354..278fd0d 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/CsWaveServiceImpl.java @@ -115,7 +115,6 @@ public class CsWaveServiceImpl implements ICsWaveService { time -= 8 * 3600; // 将millisecond转换为长整型,并乘以1000以获取微秒 long millisecondValue = millisecond.longValue() * 1000; -// long millisecondValue = Long.parseLong(String.valueOf(millisecond))*1000; // 计算最终时间 long finalTime = subtleTime + millisecondValue; // 如果finalTime大于等于1000000,将startTime增加1秒,finalTime减去1000000 diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java index 09ce802..7ecb5be 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/EventServiceImpl.java @@ -72,6 +72,7 @@ public class EventServiceImpl implements IEventService { @Override @Transactional(rollbackFor = Exception.class) public void analysis(AppEventMessage appEventMessage) { + //todo 这边到时候装置事件、暂态事件需要分开处理 List list1 = new ArrayList<>(); List records = new ArrayList(); List list2 = new ArrayList<>(); diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java index 0ca15eb..29d8e47 100644 --- a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/service/impl/FileServiceImpl.java @@ -21,8 +21,11 @@ import com.njcn.zlevent.param.CsEventParam; import com.njcn.zlevent.pojo.dto.FileInfoDto; import com.njcn.zlevent.pojo.dto.FileStreamDto; import com.njcn.zlevent.pojo.dto.WaveTimeDto; +import com.njcn.zlevent.pojo.po.CsEventLogs; +import com.njcn.zlevent.service.ICsEventLogsService; import com.njcn.zlevent.service.ICsEventService; import com.njcn.zlevent.service.IFileService; +import com.njcn.zlevent.utils.FileCheckUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.sf.json.JSONObject; @@ -33,6 +36,10 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; +import java.security.NoSuchAlgorithmException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.*; /** @@ -57,6 +64,8 @@ public class FileServiceImpl implements IFileService { private final ICsEventService csEventService; + private final ICsEventLogsService csEventLogsService; + @Override public void analysisFileInfo(AppFileMessage appFileMessage) { if (Objects.equals(appFileMessage.getCode(), AccessEnum.SUCCESS.getCode())){ @@ -73,7 +82,7 @@ public class FileServiceImpl implements IFileService { } else { int total = (int)Math.ceil(fileSize*1.0/range) ; for (int i = 0; i < total; i++) { - askFileStream(appFileMessage.getId(),mid,fileName,i*range,range-1); + askFileStream(appFileMessage.getId(),mid,fileName,i*range,range); mid++; } fileInfoDto.setNumber(mid-1); @@ -101,79 +110,98 @@ public class FileServiceImpl implements IFileService { } @Override - @Transactional(rollbackFor = Exception.class) public void analysisFileStream(AppFileMessage appFileMessage) { - //todo 目前文件先只处理暂态事件的,后续有其他文件再做处理 - String fileName = appFileMessage.getMsg().getName(); - if(fileName.contains(".cfg") || fileName.contains(".dat")) { - FileStreamDto fileStreamDto = new FileStreamDto(); - String filePath; - Map map = new HashMap<>(); - //获取缓存的文件信息 - Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); - FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class); - //文件流 - Object object = redisUtil.getObjectByKey(fileName); - /* - * 文件解析存储逻辑 - * 1.如果文件只有1帧,那就直接解析文件流; - * 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件 - */ - if (Objects.isNull(object)){ - //第一次录入 - if(fileInfoDto.getNumber() == 1) { - //直接解析文件 - filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId()); - log.info(fileName + "解析成功"); - redisUtil.delete(fileName); - //波形文件关联事件 - filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); - correlateEvents(fileInfoDto,filePath); + //日志记录 + CsEventLogs csEventLogs = new CsEventLogs(); + csEventLogs.setNdid(appFileMessage.getId()); + csEventLogs.setFileName(appFileMessage.getMsg().getName()); + try { + //todo 目前文件先只处理暂态事件的,后续有其他文件再做处理 + String fileName = appFileMessage.getMsg().getName(); + if(fileName.contains(".cfg") || fileName.contains(".dat")) { + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + FileStreamDto fileStreamDto = new FileStreamDto(); + String filePath; + Map map = new HashMap<>(); + //获取缓存的文件信息 + Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName)); + FileInfoDto fileInfoDto = JSON.parseObject(JSON.toJSONString(fileInfo), FileInfoDto.class); + //文件流 + Object object = redisUtil.getObjectByKey(fileName); + /* + * 文件解析存储逻辑 + * 1.如果文件只有1帧,那就直接解析文件流; + * 2.如果文件有多帧,判断当前帧是否是最后一帧,是则直接解析文件,不是则先缓存起来,等收完全再开始解析文件 + */ + if (Objects.isNull(object)){ + //第一次录入 + if(fileInfoDto.getNumber() == 1) { + //直接解析文件 + filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId()); + redisUtil.delete(fileName); + //波形文件关联事件 + filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); + correlateEvents(fileInfoDto,filePath); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件1帧,全部收到,解析成功!"); + csEventLogs.setCompleteTime(LocalDateTime.now()); + csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt)); + csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt)); + } else { + //缓存文件 + map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData()); + fileStreamDto.setMap(map); + redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!"); + csEventLogs.setCompleteTime(LocalDateTime.now()); + csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt)); + csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt)); + } } else { - //缓存文件 - map.put(appFileMessage.getMid(),appFileMessage.getMsg().getData()); - fileStreamDto.setMap(map); - redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); + //分帧传递数据,需要校验收到的文件个数 + fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class); + Map l1 = fileStreamDto.getMap(); + l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData()); + if (l1.size() == fileInfoDto.getNumber()){ + //解析文件 + filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId()); + redisUtil.delete(fileName); + //波形文件关联事件 + filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); + correlateEvents(fileInfoDto,filePath); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件"+l1.size()+"帧,这是第"+l1.size()+"帧,全部收到,解析成功!"); + csEventLogs.setCompleteTime(LocalDateTime.now()); + csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt)); + csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt)); + } else { + //缓存 + fileStreamDto = new FileStreamDto(); + fileStreamDto.setMap(l1); + redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); + csEventLogs.setStatus(1); + csEventLogs.setRemark("当前文件"+fileInfoDto.getNumber()+"帧,这是第"+appFileMessage.getMid()+"帧,记录成功!"); + csEventLogs.setCompleteTime(LocalDateTime.now()); + csEventLogs.setStartTime(LocalDateTime.parse(fileInfoDto.getStartTime(), fmt)); + csEventLogs.setEndTime(LocalDateTime.parse(fileInfoDto.getEndTime(), fmt)); + } } + //记录日志 + csEventLogsService.save(csEventLogs); } else { - //分帧传递数据,需要校验收到的文件个数 - fileStreamDto = JSON.parseObject(JSON.toJSONString(object), FileStreamDto.class); - Map l1 = fileStreamDto.getMap(); - l1.put(appFileMessage.getMid(),appFileMessage.getMsg().getData()); - if (l1.size() == fileInfoDto.getNumber()){ - //对数据进行排序 - // 将Map的Entry集合转换成List - List> entryList = new ArrayList<>(l1.entrySet()); - // 使用Comparator按Key进行排序 - entryList.sort(new Comparator>() { - @Override - public int compare(Map.Entry entry1, Map.Entry entry2) { - return entry1.getKey().compareTo(entry2.getKey()); - } - }); - //解析文件 - filePath = fileStream(fileInfoDto.getNumber(),l1,null,fileName,appFileMessage.getId()); - log.info(fileName + "解析成功"); - redisUtil.delete(fileName); - //波形文件关联事件 - filePath = filePath.replaceAll(GeneralConstant.CFG,"").replaceAll(GeneralConstant.DAT,""); - correlateEvents(fileInfoDto,filePath); - } else { - //缓存 - fileStreamDto = new FileStreamDto(); - fileStreamDto.setMap(l1); - redisUtil.saveByKeyWithExpire(fileName, fileStreamDto, 3600L); - } + //todo 处理其他文件 + log.info("暂未做其他文件处理"); } + } catch (Exception e){ + csEventLogs.setStatus(0); + csEventLogs.setRemark("文件解析失败,失败原因:" + e.getMessage()); + csEventLogs.setCompleteTime(LocalDateTime.now()); //记录日志 - - } else { - //todo 处理其他文件 - log.info("暂未做其他文件处理"); + csEventLogsService.save(csEventLogs); } } - /** * 请求文件流信息 */ @@ -233,6 +261,7 @@ public class FileServiceImpl implements IFileService { byteArray = bytes; } InputStream inputStream = new ByteArrayInputStream(byteArray); + //todo 此处需要做文件crc校验或者md5校验,目前不知道怎么处理,先放一下 String path = fileStorageUtil.uploadStreamSpecifyName(inputStream, OssPath.WAVE_DIR + folder + StrUtil.SLASH,fileName); try { inputStream.close(); @@ -277,6 +306,5 @@ public class FileServiceImpl implements IFileService { csEventParam.setEndTime(fileInfoDto.getEndTime()); csEventParam.setPath(path); csEventService.updateCsEvent(csEventParam); - } } diff --git a/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/FileCheckUtils.java b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/FileCheckUtils.java new file mode 100644 index 0000000..29fc592 --- /dev/null +++ b/iot-analysis/analysis-zl-event/zl-event-boot/src/main/java/com/njcn/zlevent/utils/FileCheckUtils.java @@ -0,0 +1,59 @@ +package com.njcn.zlevent.utils; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.zip.CRC32; + +/** + * 类的介绍:用于文件校验 + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/9/8 13:32 + */ + +public class FileCheckUtils { + + /** + * 32位CRC检验 + * @param inputStream 文件流 + * @return + * @throws IOException + */ + public static long calculateCRC32Checksum(InputStream inputStream) throws IOException { + CRC32 crc32 = new CRC32(); + // 用于读取文件内容的缓冲区 + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + crc32.update(buffer, 0, bytesRead); + } + return crc32.getValue(); + } + + /** + * MD5检验 + * @param inputStream 文件流 + * @return + * @throws IOException + */ + public static String calculateMD5Checksum(InputStream inputStream) throws IOException, NoSuchAlgorithmException { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + // 用于读取文件内容的缓冲区 + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + md5.update(buffer, 0, bytesRead); + } + byte[] digest = md5.digest(); + // 将MD5摘要转换为十六进制字符串 + StringBuilder md5Checksum = new StringBuilder(); + for (byte b : digest) { + md5Checksum.append(String.format("%02x", b)); + } + return md5Checksum.toString(); + } +} diff --git a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java index 614c0b6..ce0b671 100644 --- a/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java +++ b/iot-message/message-boot/src/main/java/com/njcn/message/consumer/AppEventConsumer.java @@ -57,6 +57,9 @@ public class AppEventConsumer extends EnhanceConsumerMessageHandler