1.录波文件下载处理;

2.装置异常告警事件处理
This commit is contained in:
xy
2024-09-13 20:41:18 +08:00
parent bb298501eb
commit 335997cdf6
28 changed files with 567 additions and 184 deletions

View File

@@ -45,7 +45,7 @@ public class AskDeviceDataClientFallbackFactory implements FallbackFactory<AskDe
@Override @Override
public HttpResult<Boolean> downloadFile(String nDid, String name, Integer size, String fileCheck) { public HttpResult<Boolean> downloadFile(String nDid, String name, Integer size, String fileCheck) {
log.error("{}异常,降级处理,异常为:{}","文件下载",cause.toString()); log.error("{}异常,降级处理,异常为:{}","文件下载",cause.toString());
redisUtil.delete("fileDowning"); redisUtil.delete("fileDowning:" + nDid);
redisUtil.delete("fileCheck"+name); redisUtil.delete("fileCheck"+name);
throw new BusinessException(finalExceptionEnum); throw new BusinessException(finalExceptionEnum);
} }

View File

@@ -70,6 +70,10 @@ public class AutoDataDto {
@ApiModelProperty("数据是否参与合格率统计") @ApiModelProperty("数据是否参与合格率统计")
private Integer dataTag; private Integer dataTag;
@SerializedName("Code")
@ApiModelProperty("事件码")
private Integer code;
@SerializedName("Data") @SerializedName("Data")
private String data; private String data;

View File

@@ -76,6 +76,10 @@ public class EventDto {
@ApiModelProperty("告警故障编码一般显示为Hex") @ApiModelProperty("告警故障编码一般显示为Hex")
private String code; private String code;
@SerializedName("DataTag")
@ApiModelProperty("数据标识1-标识数据异常")
private Integer dataTag;
@SerializedName("Parm") @SerializedName("Parm")
private List<Param> param; private List<Param> param;
} }

View File

@@ -638,6 +638,27 @@ public class MqttMessageHandler {
} }
} }
/**
* 装置异常事件记录
* @param topic
* @param message
* @param version
* @param nDid
* @param payload
*/
@MqttSubscribe(value = "/Dev/Error/{edgeId}",qos = 1)
@Transactional(rollbackFor = Exception.class)
public void devErrorInfo(String topic, MqttMessage message, @NamedValue("version") String version, @NamedValue("edgeId") String nDid, @Payload String payload) {
//解析数据
Gson gson = new Gson();
log.info(nDid + "事件报文为:" + new String(message.getPayload(), StandardCharsets.UTF_8));
EventDto eventDto = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), EventDto.class);
JSONObject jsonObject0 = JSONObject.parseObject(JSON.toJSONString(eventDto));
AppEventMessage appEventMessage = JSONObject.toJavaObject(jsonObject0, AppEventMessage.class);
appEventMessage.setId(nDid);
appEventMessageTemplate.sendMember(appEventMessage);
}
private void saveDirectoryInfo(List<FileDto.DirInfo> dirInfo, String key) { private void saveDirectoryInfo(List<FileDto.DirInfo> dirInfo, String key) {
if (!CollectionUtil.isEmpty(dirInfo)) { if (!CollectionUtil.isEmpty(dirInfo)) {
redisUtil.saveByKeyWithExpire(key, dirInfo, 10L); redisUtil.saveByKeyWithExpire(key, dirInfo, 10L);

View File

@@ -167,17 +167,17 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
return; return;
} else { } else {
logDto.setResult(0); logDto.setResult(0);
//重连三次仍未成功,则推送告警消息 //一个小时未连接上,则推送告警消息
MAX_WARNING_TIMES++; MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 3) { if (MAX_WARNING_TIMES == 30) {
NoticeUserDto dto2 = sendConnectFail(nDid); NoticeUserDto dto2 = sendConnectFail(nDid);
sendEventToUser(dto2); sendEventToUser(dto2);
} }
} }
} else { } else {
//重连三次仍未成功,则推送告警消息 //一个小时未连接上,则推送告警消息
MAX_WARNING_TIMES++; MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 3) { if (MAX_WARNING_TIMES == 30) {
NoticeUserDto dto2 = sendConnectFail(nDid); NoticeUserDto dto2 = sendConnectFail(nDid);
sendEventToUser(dto2); sendEventToUser(dto2);
} }

View File

@@ -84,7 +84,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
@Override @Override
public boolean downloadFile(String nDid, String name, Integer size, String fileCheck) { public boolean downloadFile(String nDid, String name, Integer size, String fileCheck) {
boolean result = true; boolean result = true;
redisUtil.saveByKeyWithExpire("fileDowning","fileDowning",300L); redisUtil.saveByKeyWithExpire("fileDowning:"+nDid,"fileDowning",300L);
redisUtil.saveByKeyWithExpire("fileCheck"+name,fileCheck,300L); redisUtil.saveByKeyWithExpire("fileCheck"+name,fileCheck,300L);
int length = size/51200 + 1; int length = size/51200 + 1;
try { try {
@@ -110,7 +110,7 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid); redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid);
} }
} catch (Exception e) { } catch (Exception e) {
redisUtil.delete("fileDowning"); redisUtil.delete("fileDowning:"+nDid);
redisUtil.delete("fileCheck"+name); redisUtil.delete("fileCheck"+name);
throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOAD_ERROR); throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOAD_ERROR);
} }

View File

@@ -170,7 +170,7 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
if (Objects.nonNull(object)) { if (Objects.nonNull(object)) {
throw new BusinessException(AlgorithmResponseEnum.FILE_UPLOADING); throw new BusinessException(AlgorithmResponseEnum.FILE_UPLOADING);
} }
Object object2 = redisUtil.getObjectByKey("fileDowning"); Object object2 = redisUtil.getObjectByKey("fileDowning:" + id);
if (Objects.nonNull(object2)) { if (Objects.nonNull(object2)) {
throw new BusinessException(AlgorithmResponseEnum.FILE_BUSY); throw new BusinessException(AlgorithmResponseEnum.FILE_BUSY);
} }
@@ -190,6 +190,9 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
//需要循环的次数 //需要循环的次数
int times = bytes.length / cap + 1; int times = bytes.length / cap + 1;
for (int i = 1; i <= times; i++) { for (int i = 1; i <= times; i++) {
//发送数据给前端
String json = "{allStep:\""+times+"\",nowStep:"+i+"}";
publisher.send("/Web/Progress", new Gson().toJson(json), 1, false);
DeviceLogDTO logDto = new DeviceLogDTO(); DeviceLogDTO logDto = new DeviceLogDTO();
byte[] lsBytes; byte[] lsBytes;
if (length > 50*1024) { if (length > 50*1024) {
@@ -219,6 +222,8 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
csLogsFeignClient.addUserLog(logDto); csLogsFeignClient.addUserLog(logDto);
} }
} else { } else {
String json = "{allStep:\""+1+"\",nowStep:"+1+"}";
publisher.send("/Web/Progress", new Gson().toJson(json), 1, false);
ReqAndResDto.Req req = getPojo(1,path,file,length,bytes,0,hexString); ReqAndResDto.Req req = getPojo(1,path,file,length,bytes,0,hexString);
publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false); publisher.send("/Pfm/DevFileCmd/" + version + "/" + id, new Gson().toJson(req), 1, false);
DeviceLogDTO logDto = new DeviceLogDTO(); DeviceLogDTO logDto = new DeviceLogDTO();

View File

@@ -15,4 +15,8 @@ public interface EventFeignClient {
@PostMapping("/analysis") @PostMapping("/analysis")
HttpResult<String> analysis(AppEventMessage appEventMessage); HttpResult<String> analysis(AppEventMessage appEventMessage);
@PostMapping("/errorEvent")
HttpResult<String> insertErrorEvent(AppEventMessage appEventMessage);
} }

View File

@@ -0,0 +1,19 @@
package com.njcn.zlevent.api;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.zlevent.api.fallback.EvtErrorClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
/**
* @author xy
*/
@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/csDevErrEvt", fallbackFactory = EvtErrorClientFallbackFactory.class,contextId = "csDevErrEvt")
public interface EvtErrorFeignClient {
@PostMapping("/errorEvent")
HttpResult<String> insertErrorEvent(AppEventMessage appEventMessage);
}

View File

@@ -3,14 +3,14 @@ package com.njcn.zlevent.api;
import com.njcn.common.pojo.constant.ServerInfo; import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.message.AppEventMessage;
import com.njcn.zlevent.api.fallback.EventClientFallbackFactory; import com.njcn.zlevent.api.fallback.WaveClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
/** /**
* @author xy * @author xy
*/ */
@FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/wave", fallbackFactory = EventClientFallbackFactory.class,contextId = "wave") @FeignClient(value = ServerInfo.CS_ZL_EVENT_BOOT, path = "/wave", fallbackFactory = WaveClientFallbackFactory.class,contextId = "wave")
public interface WaveFeignClient { public interface WaveFeignClient {
@PostMapping("/analysis") @PostMapping("/analysis")

View File

@@ -30,6 +30,12 @@ public class EventClientFallbackFactory implements FallbackFactory<EventFeignCli
log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString()); log.error("{}异常,降级处理,异常为:{}","数据解析",cause.toString());
throw new BusinessException(finalExceptionEnum); throw new BusinessException(finalExceptionEnum);
} }
@Override
public HttpResult<String> insertErrorEvent(AppEventMessage appEventMessage) {
log.error("{}异常,降级处理,异常为:{}","异常事件统计",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
}; };
} }
} }

View File

@@ -0,0 +1,35 @@
package com.njcn.zlevent.api.fallback;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.zlevent.api.EvtErrorFeignClient;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author xy
*/
@Slf4j
@Component
public class EvtErrorClientFallbackFactory implements FallbackFactory<EvtErrorFeignClient> {
@Override
public EvtErrorFeignClient create(Throwable cause) {
//判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if (cause.getCause() instanceof BusinessException) {
BusinessException businessException = (BusinessException) cause.getCause();
}
Enum<?> finalExceptionEnum = exceptionEnum;
return new EvtErrorFeignClient() {
@Override
public HttpResult<String> insertErrorEvent(AppEventMessage appEventMessage) {
log.error("{}异常,降级处理,异常为:{}","异常事件统计",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -5,6 +5,7 @@ import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult; import com.njcn.common.pojo.response.HttpResult;
import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.message.AppEventMessage;
import com.njcn.zlevent.api.EventFeignClient; import com.njcn.zlevent.api.EventFeignClient;
import com.njcn.zlevent.api.WaveFeignClient;
import feign.hystrix.FallbackFactory; import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@@ -14,16 +15,16 @@ import org.springframework.stereotype.Component;
*/ */
@Slf4j @Slf4j
@Component @Component
public class WaveClientFallbackFactory implements FallbackFactory<EventFeignClient> { public class WaveClientFallbackFactory implements FallbackFactory<WaveFeignClient> {
@Override @Override
public EventFeignClient create(Throwable cause) { public WaveFeignClient create(Throwable cause) {
//判断抛出异常是否为解码器抛出的业务异常 //判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK; Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if (cause.getCause() instanceof BusinessException) { if (cause.getCause() instanceof BusinessException) {
BusinessException businessException = (BusinessException) cause.getCause(); BusinessException businessException = (BusinessException) cause.getCause();
} }
Enum<?> finalExceptionEnum = exceptionEnum; Enum<?> finalExceptionEnum = exceptionEnum;
return new EventFeignClient() { return new WaveFeignClient() {
@Override @Override
public HttpResult<String> analysis(AppEventMessage appEventMessage) { public HttpResult<String> analysis(AppEventMessage appEventMessage) {

View File

@@ -5,6 +5,7 @@ import lombok.Data;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* 类的介绍: * 类的介绍:
@@ -22,6 +23,6 @@ public class FileStreamDto implements Serializable {
private Integer frameLen; private Integer frameLen;
private List<Integer> list; private Set<Integer> list;
} }

View File

@@ -12,8 +12,12 @@ import lombok.Data;
@Data @Data
public class WaveTimeDto { public class WaveTimeDto {
private String fileName;
private String deviceId; private String deviceId;
private String nDid;
private String lineId; private String lineId;
private String startTime; private String startTime;

View File

@@ -0,0 +1,43 @@
package com.njcn.zlevent.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* <p>
* 装置异常事件统计
* </p>
*
* @author xy
* @since 2024-09-12
*/
@Data
@TableName("cs_dev_err_evt")
public class CsDevErrEvt {
private static final long serialVersionUID = 1L;
/**
* id
*/
private String id;
/**
* 设备识别码
*/
private String ndid;
/**
* 事件发生时间
*/
private LocalDateTime evtTime;
/**
* 事件code编码
*/
private String code;
}

View File

@@ -0,0 +1,50 @@
package com.njcn.zlevent.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.common.utils.HttpResultUtil;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.web.controller.BaseController;
import com.njcn.zlevent.service.ICsDevErrEvtService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
* 装置异常事件统计 前端控制器
* </p>
*
* @author xy
* @since 2024-09-12
*/
@RestController
@Slf4j
@RequestMapping("/csDevErrEvt")
@Api(tags = "装置异常事件处理")
@AllArgsConstructor
public class CsDevErrEvtController extends BaseController {
private final ICsDevErrEvtService csDevErrEvtService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/errorEvent")
@ApiOperation("异常事件统计")
@ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true)
public HttpResult<String> insertErrorEvent(@RequestBody AppEventMessage appEventMessage){
String methodDescribe = getMethodDescribe("insertErrorEvent");
csDevErrEvtService.insertErrorEvent(appEventMessage);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}
}

View File

@@ -36,7 +36,7 @@ public class WaveController extends BaseController {
@OperateInfo(info = LogEnum.BUSINESS_COMMON) @OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/analysis") @PostMapping("/analysis")
@ApiOperation("录波解析") @ApiOperation("录波事件")
@ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true) @ApiImplicitParam(name = "appEventMessage", value = "数据实体", required = true)
public HttpResult<String> analysis(@RequestBody AppEventMessage appEventMessage){ public HttpResult<String> analysis(@RequestBody AppEventMessage appEventMessage){
String methodDescribe = getMethodDescribe("analysis"); String methodDescribe = getMethodDescribe("analysis");

View File

@@ -8,13 +8,18 @@ import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum; import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.dto.file.FileRedisDto;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum; import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.EpdFeignClient; import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.pojo.dto.EpdDTO; import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.zlevent.pojo.dto.FileStreamDto; import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveAnalysisService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@@ -25,10 +30,7 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream; import java.util.stream.IntStream;
/** /**
@@ -48,6 +50,13 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private CsTopicFeignClient csTopicFeignClient; private CsTopicFeignClient csTopicFeignClient;
@Resource @Resource
private MqttPublisher publisher; private MqttPublisher publisher;
@Resource
private ChannelObjectUtil channelObjectUtil;
@Resource
private ICsWaveAnalysisService iCsWaveAnalysisService;
private static Integer mid = 1;
private static Integer range = 51200;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer); super(listenerContainer);
} }
@@ -85,35 +94,103 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
int end = dto.getTotal(); int end = dto.getTotal();
IntStream.rangeClosed(start, end) IntStream.rangeClosed(start, end)
.filter(i -> !dto.getList().contains(i)) .filter(i -> !dto.getList().contains(i))
.forEach(missingNumber -> { .forEach(missingList::add);
log.info("缺失的数字:{}",missingNumber); Object object = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName));
missingList.add(missingNumber); if (CollectionUtil.isNotEmpty(missingList) && Objects.isNull(object)) {
}); downloadFile(missingList,dto.getNDid(),fileName);
redisUtil.saveByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName), missingList); }
Integer offset = (missingList.get(0) - 1) * dto.getFrameLen();
askMissingFileStream(dto.getNDid(),missingList.get(0),fileName,offset,dto.getFrameLen());
} }
} }
//请求缺失的数据
public void downloadFile( List<Integer> missingList, String nDid, String name) {
for (Integer missingNumber : missingList) {
int i = missingNumber - 1;
Object object = getDeviceMid(nDid);
if (!Objects.isNull(object)) {
mid = (Integer) object;
}
ReqAndResDto.Req reqAndResParam = getPojo(mid,name,i);
publisher.send("/Pfm/DevFileCmd/V1/"+nDid,new Gson().toJson(reqAndResParam),1,false);
//判断是否重发
sendNextStep(name,nDid,mid,i,nDid);
FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + name + mid);
//重发之后判断继续循环还是跳出循环
if (!Objects.isNull(fileRedisDto) && !Objects.equals(fileRedisDto.getCode(),200)) {
break;
}
mid = mid + 1;
if (mid > 10000) {
mid = 1;
}
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,mid);
}
}
public void askMissingFileStream(String nDid, Integer mid, String fileName, Integer offset, Integer len) { public Object getDeviceMid(String nDid) {
String version = csTopicFeignClient.find(nDid).getData(); return redisUtil.getObjectByKey(AppRedisKey.DEVICE_MID + nDid);
}
/**
* 文件下载请求报文
*/
public ReqAndResDto.Req getPojo(Integer mid, String fileName, Integer step) {
String json;
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
reqAndResParam.setMid(mid); reqAndResParam.setMid(mid);
reqAndResParam.setDid(0); reqAndResParam.setDid(0);
reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode()); reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode())); reqAndResParam.setType(Integer.parseInt(TypeEnum.TYPE_9.getCode()));
reqAndResParam.setExpire(-1); reqAndResParam.setExpire(-1);
String json = "{Name:\""+fileName+"\",Offset:"+offset+",Len:"+len+"}"; json = "{Name:\""+fileName+"\",TransferMode:"+1+",Offset:"+(step*range)+",Len:"+range+"}";
JSONObject jsonObject = JSONObject.fromObject(json); JSONObject jsonObject = JSONObject.fromObject(json);
reqAndResParam.setMsg(jsonObject); reqAndResParam.setMsg(jsonObject);
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); return reqAndResParam;
log.info("请求文件流报文:" + new Gson().toJson(reqAndResParam));
} }
/**
* 根据装置响应来判断是否询问下一帧数据
*/
public void sendNextStep(String fileName, String id, int mid,int step, String nDid) {
try {
for (int i = 1; i < 31; i++) {
if (step == 0 ){
Thread.sleep(5000);
} else {
Thread.sleep(2000);
}
FileRedisDto fileRedisDto = (FileRedisDto) redisUtil.getObjectByKey(AppRedisKey.DOWNLOAD + fileName + mid);
if (Objects.isNull(fileRedisDto)) {
FileRedisDto failDto = new FileRedisDto();
failDto.setCode(400);
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L);
} else {
if (Objects.equals(fileRedisDto.getCode(),200)) {
redisUtil.delete("handleEvent:" + nDid);
redisUtil.delete(AppRedisKey.FILE_PART.concat(fileName));
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileName));
//删除已经处理完的文件,之后再判断还有是否需要下载的文件
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class);
fileDto.removeIf(item2 -> item2.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + nDid, fileDto);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(nDid);
}
break;
} else {
log.info("" +i+"次尝试");
//尝试失败则设置code为400如果装置响应了则会将code置为200
FileRedisDto failDto = new FileRedisDto();
failDto.setCode(400);
redisUtil.saveByKeyWithExpire(AppRedisKey.DOWNLOAD + fileName + mid,failDto,10L);
ReqAndResDto.Req req = getPojo(mid,fileName,step);
publisher.send("/Pfm/DevFileCmd/V1" + id, new Gson().toJson(req), 1, false);
}
}
}
} catch (Exception e) {
throw new BusinessException(AlgorithmResponseEnum.ASK_DEVICE_DIR_ERROR);
}
}
} }

View File

@@ -0,0 +1,16 @@
package com.njcn.zlevent.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.zlevent.pojo.po.CsDevErrEvt;
/**
* <p>
* 装置异常事件统计 Mapper 接口
* </p>
*
* @author xy
* @since 2024-09-12
*/
public interface CsDevErrEvtMapper extends BaseMapper<CsDevErrEvt> {
}

View File

@@ -0,0 +1,23 @@
package com.njcn.zlevent.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.zlevent.pojo.po.CsDevErrEvt;
/**
* <p>
* 装置异常事件统计 服务类
* </p>
*
* @author xy
* @since 2024-09-12
*/
public interface ICsDevErrEvtService extends IService<CsDevErrEvt> {
/**
* 将装置推送的异常事件统计,目前先入库
* @param appEventMessage
*/
void insertErrorEvent(AppEventMessage appEventMessage);
}

View File

@@ -20,4 +20,6 @@ public interface ICsWaveAnalysisService {
*/ */
void analysis(AppEventMessage appEventMessage); void analysis(AppEventMessage appEventMessage);
void channelWave(String nDid);
} }

View File

@@ -2,7 +2,6 @@ package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
@@ -58,29 +57,20 @@ public class CsAlarmServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray(); List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) { for (AppEventMessage.DataArray item : dataArray) {
eventTime = eventService.timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); eventTime = eventService.timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
LambdaQueryWrapper<CsEventPO> wrapper = new LambdaQueryWrapper<>(); //事件入库
wrapper.eq(CsEventPO::getDeviceId,po.getId()) CsEventPO csEvent = new CsEventPO();
.eq(CsEventPO::getProcess,po.getProcess()) csEvent.setId(id);
.eq(CsEventPO::getCode,item.getCode()) csEvent.setDeviceId(po.getId());
.eq(CsEventPO::getStartTime,eventTime) csEvent.setProcess(po.getProcess());
.eq(CsEventPO::getTag,item.getName()); csEvent.setCode(item.getCode());
List<CsEventPO> list = csEventService.list(wrapper); csEvent.setStartTime(eventTime);
if (CollectionUtil.isEmpty(list)) { tag = item.getName();
//事件入库 csEvent.setTag(tag);
CsEventPO csEvent = new CsEventPO(); csEvent.setType(3);
csEvent.setId(id); csEvent.setClDid(appEventMessage.getMsg().getClDid());
csEvent.setDeviceId(po.getId()); csEvent.setLevel(Integer.parseInt(item.getType()));
csEvent.setProcess(po.getProcess()); csEvent.setCode(item.getCode());
csEvent.setCode(item.getCode()); list1.add(csEvent);
csEvent.setStartTime(eventTime);
tag = item.getName();
csEvent.setTag(tag);
csEvent.setType(3);
csEvent.setClDid(appEventMessage.getMsg().getClDid());
csEvent.setLevel(Integer.parseInt(item.getType()));
csEvent.setCode(item.getCode());
list1.add(csEvent);
}
} }
if (CollectionUtil.isNotEmpty(list1)){ if (CollectionUtil.isNotEmpty(list1)){
csEventService.saveBatch(list1); csEventService.saveBatch(list1);

View File

@@ -0,0 +1,61 @@
package com.njcn.zlevent.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.zlevent.mapper.CsDevErrEvtMapper;
import com.njcn.zlevent.pojo.po.CsDevErrEvt;
import com.njcn.zlevent.service.ICsDevErrEvtService;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
/**
* <p>
* 装置异常事件统计 服务实现类
* </p>
*
* @author xy
* @since 2024-09-12
*/
@Service
public class CsDevErrEvtServiceImpl extends ServiceImpl<CsDevErrEvtMapper, CsDevErrEvt> implements ICsDevErrEvtService {
@Override
public void insertErrorEvent(AppEventMessage appEventMessage) {
List<CsDevErrEvt> list = new ArrayList<>();
List<AppEventMessage.DataArray> dataArrayList = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray dataArray : dataArrayList) {
CsDevErrEvt evt = new CsDevErrEvt();
evt.setNdid(appEventMessage.getId());
evt.setEvtTime(timeFormat(dataArray.getDataTimeSec(),dataArray.getDataTimeUSec()));
evt.setCode(dataArray.getCode());
list.add(evt);
}
this.saveBatch(list);
}
/**
* 时间转换
*/
public LocalDateTime timeFormat(Long time1, Long time2) {
String time;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
time1 = time1 - 8*3600;
long t1 = time1 * 1000000 + time2;
String time1String = String.valueOf(t1);
String time11 = time1String.substring(time1String.length() - 6);
String time111 = time1String.substring(0,time1String.length() - 6);
String formatTime1 = format.format(Long.parseLong(time111) * 1000);
if (time2 == 0){
time = formatTime1 + ".000000";
} else {
time = formatTime1 + "." + time11;
}
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
return LocalDateTime.parse(time, fmt);
}
}

View File

@@ -7,9 +7,11 @@ import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.enums.AccessEnum; import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum; import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csdevice.pojo.po.CsLinePO; import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.mq.message.AppEventMessage; import com.njcn.mq.message.AppEventMessage;
import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.pojo.enums.AppRedisKey;
@@ -24,6 +26,7 @@ import com.njcn.zlevent.service.ICsWaveAnalysisService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@@ -54,11 +57,11 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
private final DicDataFeignClient dicDataFeignClient; private final DicDataFeignClient dicDataFeignClient;
private final ChannelObjectUtil channelObjectUtil;
@Override @Override
public void analysis(AppEventMessage appEventMessage) { public void analysis(AppEventMessage appEventMessage) {
int mid = 1; List<WaveTimeDto> list = new ArrayList<>();
//获取监测点
String lineId = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId()); Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
if (Objects.isNull(object1)){ if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId()); lineInfo(appEventMessage.getId());
@@ -69,30 +72,43 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
List<AppEventMessage.DataArray> dataArrayList = appEventMessage.getMsg().getDataArray(); List<AppEventMessage.DataArray> dataArrayList = appEventMessage.getMsg().getDataArray();
if (CollectionUtil.isNotEmpty(dataArrayList)){ if (CollectionUtil.isNotEmpty(dataArrayList)){
for (AppEventMessage.DataArray item : dataArrayList) { for (AppEventMessage.DataArray item : dataArrayList) {
//处理mid
if (Objects.equals(mid,10000)){
mid = 1;
}
List<AppEventMessage.Param> paramList = item.getParam(); List<AppEventMessage.Param> paramList = item.getParam();
Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData(); Object object = paramList.stream().filter(item2 -> ZlConstant.WAVE_NAME.equals(item2.getName())).findFirst().get().getData();
Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData(); Object object2 = paramList.stream().filter(item2 -> ZlConstant.WAVE_PARAM_RCDKEEPTIME.equals(item2.getName())).findFirst().get().getData();
Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData(); Object object3 = paramList.stream().filter(item2 -> ZlConstant.WAVE_POSITION.equals(item2.getName())).findFirst().get().getData();
if (Objects.equals(object3.toString(),ZlConstant.GRID)){ String lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid();
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString();
} else if (Objects.equals(object3.toString(),ZlConstant.LOAD)){
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString();
} else {
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
}
String fileName = object.toString().replaceAll("\\[","").replaceAll("]",""); String fileName = object.toString().replaceAll("\\[","").replaceAll("]","");
List<String> fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList()); List<String> fileList = Arrays.stream(fileName.split(",")).collect(Collectors.toList());
//获取到录波文件,将文件信息存储起来
for (String file : fileList) { for (String file : fileList) {
file = file.trim(); file = file.trim();
askFileInfo(appEventMessage.getId(),mid,file); WaveTimeDto dto = channelTimeRange(file,appEventMessage.getId(),item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString());
mid++; list.add(dto);
channelTimeRange(file,item.getDataTimeSec(),item.getDataTimeUSec(),(Double)object2,deviceId,lineId,object3.toString()); }
Object obj = redisUtil.getObjectByKey("eventFile:" + appEventMessage.getId());
if (!Objects.isNull(obj)) {
List<WaveTimeDto> oldList = channelObjectUtil.objectToList(obj,WaveTimeDto.class);
oldList.addAll(list);
redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), oldList);
} else {
redisUtil.saveByKey("eventFile:" + appEventMessage.getId(), list);
} }
} }
//处理缓存数据
channelWave(appEventMessage.getId());
}
}
@Override
@Async("asyncExecutor")
public void channelWave(String nDid) {
//有相同文件处理时,则不进行数据处理
Object obj = redisUtil.getObjectByKey("handleEvent:" + nDid);
if (Objects.isNull(obj)) {
WaveTimeDto dto = channelObjectUtil.objectToList( redisUtil.getObjectByKey("eventFile:" + nDid),WaveTimeDto.class).get(0);
askFileInfo(nDid,1,dto.getFileName());
} else {
throw new BusinessException(AlgorithmResponseEnum.FILE_DOWNLOADING);
} }
} }
@@ -114,10 +130,11 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false); publisher.send("/Pfm/DevFileCmd/"+version+"/"+nDid,new Gson().toJson(reqAndResParam),1,false);
} }
/** /**
* 时间处理 * 时间处理
*/ */
public void channelTimeRange(String fileName, long time, long subtleTime, Double millisecond, String deviceId, String lineId, String location) { public WaveTimeDto channelTimeRange(String fileName,String nDid, long time, long subtleTime, Double millisecond, String deviceId, String lineId, String location) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
time = time - 8*3600; time = time - 8*3600;
// 将millisecond转换为长整型并乘以1000以获取微秒 // 将millisecond转换为长整型并乘以1000以获取微秒
@@ -136,12 +153,14 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
String formatTime2 = format.format(Long.parseLong(time222) * 1000); String formatTime2 = format.format(Long.parseLong(time222) * 1000);
WaveTimeDto waveTimeDto = new WaveTimeDto(); WaveTimeDto waveTimeDto = new WaveTimeDto();
waveTimeDto.setFileName(fileName);
waveTimeDto.setStartTime(formatTime1 + "." + time11); waveTimeDto.setStartTime(formatTime1 + "." + time11);
waveTimeDto.setEndTime(formatTime2 + "." + time22); waveTimeDto.setEndTime(formatTime2 + "." + time22);
waveTimeDto.setDeviceId(deviceId); waveTimeDto.setDeviceId(deviceId);
waveTimeDto.setNDid(nDid);
waveTimeDto.setLineId(lineId); waveTimeDto.setLineId(lineId);
waveTimeDto.setLocation(location); waveTimeDto.setLocation(location);
redisUtil.saveByKeyWithExpire(AppRedisKey.TIME+fileName,waveTimeDto,600L); return waveTimeDto;
} }
/** /**

View File

@@ -3,7 +3,6 @@ package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient; import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.api.EquipmentFeignClient;
@@ -94,85 +93,75 @@ public class EventServiceImpl implements IEventService {
List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray(); List<AppEventMessage.DataArray> dataArray = appEventMessage.getMsg().getDataArray();
for (AppEventMessage.DataArray item : dataArray) { for (AppEventMessage.DataArray item : dataArray) {
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
//此处做限制,如果已有事件则不在记录通知 id = IdUtil.fastSimpleUUID();
LambdaQueryWrapper<CsEventPO> wrapper = new LambdaQueryWrapper<>(); //事件入库
wrapper.eq(CsEventPO::getDeviceId,po.getId()) CsEventPO csEvent = new CsEventPO();
.eq(CsEventPO::getProcess,po.getProcess()) csEvent.setId(id);
.eq(CsEventPO::getCode,item.getCode()) csEvent.setDeviceId(po.getId());
.eq(CsEventPO::getStartTime,eventTime) csEvent.setProcess(po.getProcess());
.eq(CsEventPO::getTag,item.getName()); csEvent.setCode(item.getCode());
List<CsEventPO> list = csEventService.list(wrapper); eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec());
if (CollectionUtil.isEmpty(list)) { csEvent.setStartTime(eventTime);
id = IdUtil.fastSimpleUUID(); tag = item.getName();
//事件入库 csEvent.setTag(tag);
CsEventPO csEvent = new CsEventPO(); if (Objects.equals(item.getType(),"2")){
csEvent.setId(id); csEvent.setType(0);
csEvent.setDeviceId(po.getId()); } else if (Objects.equals(item.getType(),"3")){
csEvent.setProcess(po.getProcess()); csEvent.setType(1);
csEvent.setCode(item.getCode()); } else if (Objects.equals(item.getType(),"1")){
eventTime = timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()); csEvent.setType(2);
csEvent.setStartTime(eventTime); lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
tag = item.getName(); csEvent.setClDid(appEventMessage.getMsg().getClDid());
csEvent.setTag(tag);
if (Objects.equals(item.getType(),"2")){
csEvent.setType(0);
} else if (Objects.equals(item.getType(),"3")){
csEvent.setType(1);
} else if (Objects.equals(item.getType(),"1")){
csEvent.setType(2);
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0").toString();
csEvent.setClDid(appEventMessage.getMsg().getClDid());
}
csEvent.setLevel(Integer.parseInt(item.getType()));
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
if (!Objects.isNull(item.getParam())){
String tableName = map.get(item.getName());
List<AppEventMessage.Param> params = item.getParam();
for (AppEventMessage.Param param : params) {
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
if (Objects.equals(ZlConstant.EVENT_POSITION,param.getName())){
if (Objects.equals(param.getData(),ZlConstant.GRID)){
fields.put(param.getName(),"电网侧");
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString();
csEvent.setClDid(1);
} else if (Objects.equals(param.getData(),ZlConstant.LOAD)){
fields.put(param.getName(),"负载侧");
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString();
csEvent.setClDid(2);
}
csEvent.setLocation(param.getData().toString());
} else {
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){
csEvent.setPersistTime(Double.parseDouble(param.getData().toString()));
}
lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid();
fields.put(param.getName(),appEventMessage.getMsg().getClDid()==1?"电网侧":"负载侧");
csEvent.setLocation(appEventMessage.getMsg().getClDid()==1?ZlConstant.GRID:ZlConstant.LOAD);
csEvent.setClDid(appEventMessage.getMsg().getClDid());
fields.put(param.getName(),param.getData());
}
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
}
}
csEvent.setLineId(lineId);
list1.add(csEvent);
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
csEventLogs.setTime(LocalDateTime.now());
csEventLogsService.save(csEventLogs);
} }
csEvent.setLevel(Integer.parseInt(item.getType()));
//参数入库
Map<String,String> map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class);
if (!Objects.isNull(item.getParam())){
String tableName = map.get(item.getName());
List<AppEventMessage.Param> params = item.getParam();
for (AppEventMessage.Param param : params) {
Map<String, String> tags = new HashMap<>();
tags.put(InfluxDBTableConstant.UUID,id);
Map<String,Object> fields = new HashMap<>();
if (Objects.equals(ZlConstant.EVENT_POSITION,param.getName())){
if (Objects.equals(param.getData(),ZlConstant.GRID)){
fields.put(param.getName(),"电网侧");
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("1").toString();
csEvent.setClDid(1);
} else if (Objects.equals(param.getData(),ZlConstant.LOAD)){
fields.put(param.getName(),"负载侧");
lineId = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("2").toString();
csEvent.setClDid(2);
}
csEvent.setLocation(param.getData().toString());
} else {
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)){
csEvent.setPersistTime(Double.parseDouble(param.getData().toString()));
}
lineId = appEventMessage.getId() + appEventMessage.getMsg().getClDid();
fields.put(param.getName(),appEventMessage.getMsg().getClDid()==1?"电网侧":"负载侧");
csEvent.setLocation(appEventMessage.getMsg().getClDid()==1?ZlConstant.GRID:ZlConstant.LOAD);
csEvent.setClDid(appEventMessage.getMsg().getClDid());
fields.put(param.getName(),param.getData());
}
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());
}
}
csEvent.setLineId(lineId);
list1.add(csEvent);
//事件处理日志库
CsEventLogs csEventLogs = new CsEventLogs();
csEventLogs.setLineId(lineId);
csEventLogs.setDeviceId(po.getId());
csEventLogs.setStartTime(timeFormat(item.getDataTimeSec(),item.getDataTimeUSec()));
csEventLogs.setTag(item.getName());
csEventLogs.setStatus(1);
csEventLogs.setTime(LocalDateTime.now());
csEventLogsService.save(csEventLogs);
} }
//cs_event入库 //cs_event入库
if (CollectionUtil.isNotEmpty(list1)){ if (CollectionUtil.isNotEmpty(list1)){

View File

@@ -12,6 +12,7 @@ import com.njcn.access.enums.AccessResponseEnum;
import com.njcn.access.enums.TypeEnum; import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.utils.CRC32Utils; import com.njcn.access.utils.CRC32Utils;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.config.GeneralInfo; import com.njcn.common.config.GeneralInfo;
import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csharmonic.api.WavePicFeignClient; import com.njcn.csharmonic.api.WavePicFeignClient;
@@ -27,10 +28,7 @@ import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.WaveTimeDto; import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.pojo.po.CsEventFileLogs; import com.njcn.zlevent.pojo.po.CsEventFileLogs;
import com.njcn.zlevent.pojo.po.CsWave; import com.njcn.zlevent.pojo.po.CsWave;
import com.njcn.zlevent.service.ICsEventFileLogsService; import com.njcn.zlevent.service.*;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.ICsWaveService;
import com.njcn.zlevent.service.IFileService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
@@ -54,22 +52,16 @@ import java.util.*;
public class FileServiceImpl implements IFileService { public class FileServiceImpl implements IFileService {
private final RedisUtil redisUtil; private final RedisUtil redisUtil;
private final CsTopicFeignClient csTopicFeignClient; private final CsTopicFeignClient csTopicFeignClient;
private final MqttPublisher publisher; private final MqttPublisher publisher;
private final FileStorageUtil fileStorageUtil; private final FileStorageUtil fileStorageUtil;
private final ICsEventService csEventService; private final ICsEventService csEventService;
private final ICsEventFileLogsService csEventLogsService; private final ICsEventFileLogsService csEventLogsService;
private final WavePicFeignClient wavePicFeignClient; private final WavePicFeignClient wavePicFeignClient;
private final ICsWaveService csWaveService; private final ICsWaveService csWaveService;
private final GeneralInfo generalInfo; private final GeneralInfo generalInfo;
private final ICsWaveAnalysisService iCsWaveAnalysisService;
private final ChannelObjectUtil channelObjectUtil;
@Override @Override
public void analysisFileInfo(AppFileMessage appFileMessage) { public void analysisFileInfo(AppFileMessage appFileMessage) {
@@ -80,12 +72,7 @@ public class FileServiceImpl implements IFileService {
String fileName = appFileMessage.getMsg().getFileInfo().getName(); String fileName = appFileMessage.getMsg().getFileInfo().getName();
//缓存文件信息用于文件流拼接 //缓存文件信息用于文件流拼接
FileInfoDto fileInfoDto = new FileInfoDto(); FileInfoDto fileInfoDto = new FileInfoDto();
//获取波形文件起始结束时间 WaveTimeDto waveTimeDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class).get(0);
Object fileInfo = redisUtil.getObjectByKey(AppRedisKey.TIME+fileName);
if (Objects.isNull(fileInfo)){
throw new BusinessException(AccessResponseEnum.WAVE_INFO_MISSING);
}
WaveTimeDto waveTimeDto = JSON.parseObject(JSON.toJSONString(fileInfo), WaveTimeDto.class);
fileInfoDto.setStartTime(waveTimeDto.getStartTime()); fileInfoDto.setStartTime(waveTimeDto.getStartTime());
fileInfoDto.setEndTime(waveTimeDto.getEndTime()); fileInfoDto.setEndTime(waveTimeDto.getEndTime());
fileInfoDto.setDeviceId(waveTimeDto.getDeviceId()); fileInfoDto.setDeviceId(waveTimeDto.getDeviceId());
@@ -110,7 +97,7 @@ public class FileServiceImpl implements IFileService {
csWave.setStatus(0); csWave.setStatus(0);
csWaveService.save(csWave); csWaveService.save(csWave);
//请求当前文件的数据 //请求当前文件的数据
askFileStream(appFileMessage.getId(),mid,fileName,0,range); askFileStream(appFileMessage.getId(),mid,fileName,-1,range);
redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto); redisUtil.saveByKey(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()), fileInfoDto);
redisUtil.delete(AppRedisKey.TIME+fileName); redisUtil.delete(AppRedisKey.TIME+fileName);
} else { } else {
@@ -122,7 +109,7 @@ public class FileServiceImpl implements IFileService {
public void analysisFileStream(AppFileMessage appFileMessage) { public void analysisFileStream(AppFileMessage appFileMessage) {
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
String filePath = null; String filePath = null;
List<Integer> list = new ArrayList<>(); Set<Integer> list = new HashSet<>();
FileStreamDto fileStreamDto = new FileStreamDto(); FileStreamDto fileStreamDto = new FileStreamDto();
//波形文件上传成功后,将文件信息存储一下,方便后期查看 //波形文件上传成功后,将文件信息存储一下,方便后期查看
CsWave csWave = new CsWave(); CsWave csWave = new CsWave();
@@ -139,7 +126,7 @@ public class FileServiceImpl implements IFileService {
String lsFileName = generalInfo.getBusinessTempPath() + File.separator + fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1]; String lsFileName = generalInfo.getBusinessTempPath() + File.separator + fileName.split(StrUtil.SLASH)[fileName.split(StrUtil.SLASH).length - 1];
File lsFile =new File(generalInfo.getBusinessTempPath()); File lsFile =new File(generalInfo.getBusinessTempPath());
//如果文件夹不存在则创建 //如果文件夹不存在则创建
if (!lsFile.exists() && !lsFile.isDirectory()){ if (!lsFile.exists() && !lsFile.isDirectory()) {
lsFile .mkdirs(); lsFile .mkdirs();
} }
//获取缓存的文件信息 //获取缓存的文件信息
@@ -180,6 +167,7 @@ public class FileServiceImpl implements IFileService {
//2.缓存了判断收到的报文个数是否和总个数一致,一致则解析文件;不一致则更新缓存 //2.缓存了判断收到的报文个数是否和总个数一致,一致则解析文件;不一致则更新缓存
//3.超时判断: 30s未收到相关文件信息核查文件个数看丢失哪些帧重新请求 //3.超时判断: 30s未收到相关文件信息核查文件个数看丢失哪些帧重新请求
else { else {
redisUtil.saveByKey("handleEvent:" + appFileMessage.getId(),"doing");
if (appFileMessage.getMsg().getFrameTotal() == 1){ if (appFileMessage.getMsg().getFrameTotal() == 1){
//解析文件入库 //解析文件入库
filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileInfoDto.getFileCheck(),"event"); filePath = fileStream(1,null,appFileMessage.getMsg().getData(),fileName,appFileMessage.getId(),fileInfoDto.getFileCheck(),"event");
@@ -197,8 +185,18 @@ public class FileServiceImpl implements IFileService {
if (CollectionUtil.isNotEmpty(eventList)){ if (CollectionUtil.isNotEmpty(eventList)){
eventList.forEach(wavePicFeignClient::getWavePics); eventList.forEach(wavePicFeignClient::getWavePics);
} }
redisUtil.delete("handleEvent:" + appFileMessage.getId());
redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName())); redisUtil.delete(AppRedisKey.RMQ_FILE_CONSUME_KEY.concat(fileInfoDto.getName()));
//删除已经处理完的文件,之后再判断还有是否需要下载的文件
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class);
fileDto.removeIf(item -> item.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(appFileMessage.getId());
}
} else { } else {
//收到数据就刷新缓存值
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 10L);
Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName)); Object object1 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART.concat(fileName));
if (Objects.isNull(object1)){ if (Objects.isNull(object1)){
fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal()); fileStreamDto.setTotal(appFileMessage.getMsg().getFrameTotal());
@@ -211,7 +209,6 @@ public class FileServiceImpl implements IFileService {
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal()); csEventLogs.setAllStep(appFileMessage.getMsg().getFrameTotal());
csEventLogs.setIsAll(0); csEventLogs.setIsAll(0);
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 5L);
redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto); redisUtil.saveByKey(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()), fileStreamDto);
//将数据写入临时文件 //将数据写入临时文件
appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData()); appendFile(lsFileName,appFileMessage.getMsg().getFrameCurr(),appFileMessage.getMsg().getData());
@@ -245,13 +242,20 @@ public class FileServiceImpl implements IFileService {
redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART.concat(appFileMessage.getMsg().getName()));
redisUtil.delete(AppRedisKey.FILE_PART_MISSING.concat(appFileMessage.getMsg().getName())); redisUtil.delete(AppRedisKey.FILE_PART_MISSING.concat(appFileMessage.getMsg().getName()));
redisUtil.delete("handleEvent:" + appFileMessage.getId());
//删除临时文件 //删除临时文件
File file = new File(lsFileName); File file = new File(lsFileName);
if (file.exists()) { if (file.exists()) {
file.delete(); file.delete();
} }
//删除已经处理完的文件,之后再判断还有是否需要下载的文件
List<WaveTimeDto> fileDto = channelObjectUtil.objectToList(redisUtil.getObjectByKey("eventFile:" + appFileMessage.getId()),WaveTimeDto.class);
fileDto.removeIf(item -> item.getFileName().equals(fileName));
redisUtil.saveByKey("eventFile:" + appFileMessage.getId(), fileDto);
if (CollectionUtil.isNotEmpty(fileDto)) {
iCsWaveAnalysisService.channelWave(appFileMessage.getId());
}
} else { } else {
Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName));
csEventLogs.setStatus(1); csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!"); csEventLogs.setRemark("当前文件" + appFileMessage.getMsg().getFrameTotal() + "帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,记录成功!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());
@@ -261,14 +265,12 @@ public class FileServiceImpl implements IFileService {
//将数据写入临时文件 //将数据写入临时文件
appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData()); appendFile(lsFileName, appFileMessage.getMsg().getFrameCurr(), appFileMessage.getMsg().getData());
log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr()); log.info("当前文件 {} 帧,这是第 {} 帧报文,记录成功", appFileMessage.getMsg().getFrameTotal(), appFileMessage.getMsg().getFrameCurr());
if (Objects.isNull(object2)) { Object object2 = redisUtil.getObjectByKey(AppRedisKey.FILE_PART_MISSING.concat(fileName));
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 10L); if (!Objects.isNull(object2)) {
} else {
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L); redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L);
} }
} }
} else { } else {
redisUtil.saveByKeyWithExpire(AppRedisKey.FILE_PART_TIME.concat(appFileMessage.getMsg().getName()), null, 1L);
csEventLogs.setStatus(1); csEventLogs.setStatus(1);
csEventLogs.setRemark("当前文件为重复帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,不做记录!"); csEventLogs.setRemark("当前文件为重复帧,这是第" + appFileMessage.getMsg().getFrameCurr() + "帧,不做记录!");
csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr()); csEventLogs.setNowStep(appFileMessage.getMsg().getFrameCurr());

View File

@@ -11,6 +11,7 @@ import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog; import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import com.njcn.zlevent.api.AlarmFeignClient; import com.njcn.zlevent.api.AlarmFeignClient;
import com.njcn.zlevent.api.EventFeignClient; import com.njcn.zlevent.api.EventFeignClient;
import com.njcn.zlevent.api.EvtErrorFeignClient;
import com.njcn.zlevent.api.WaveFeignClient; import com.njcn.zlevent.api.WaveFeignClient;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -48,6 +49,8 @@ public class AppEventConsumer extends EnhanceConsumerMessageHandler<AppEventMess
private WaveFeignClient waveFeignClient; private WaveFeignClient waveFeignClient;
@Resource @Resource
private AlarmFeignClient alarmFeignClient; private AlarmFeignClient alarmFeignClient;
@Resource
private EvtErrorFeignClient evtErrorFeignClient;
@Override @Override
protected void handleMessage(AppEventMessage appEventMessage) { protected void handleMessage(AppEventMessage appEventMessage) {
@@ -68,6 +71,10 @@ public class AppEventConsumer extends EnhanceConsumerMessageHandler<AppEventMess
case 32: case 32:
log.info("分发至便携式基础数据处理"); log.info("分发至便携式基础数据处理");
break; break;
case 241:
log.info("分发装置异常事件统计");
evtErrorFeignClient.insertErrorEvent(appEventMessage);
break;
default: default:
break; break;
} }