新增云协议相关内容

This commit is contained in:
xy
2025-09-16 18:31:55 +08:00
parent 1a3e443be1
commit 0fda2354eb
17 changed files with 1027 additions and 105 deletions

View File

@@ -35,6 +35,6 @@ public interface AskDeviceDataFeignClient {
HttpResult<String> askRealData(@RequestParam("nDid") String nDid, @RequestParam("idx") Integer idx, @RequestParam("clDId") Integer clDId);
@PostMapping("/askCldRealData")
HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId);
HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId);
}

View File

@@ -75,7 +75,7 @@ public class AskDeviceDataClientFallbackFactory implements FallbackFactory<AskDe
}
@Override
public HttpResult<String> askCldRealData(String devId, String lineId) {
public HttpResult<String> askCldRealData(String devId, String lineId, String nodeId) {
log.error("{}异常,降级处理,异常为:{}","询问云前置实时数据",cause.toString());
throw new BusinessException(finalExceptionEnum);
}

View File

@@ -128,11 +128,12 @@ public class AskDeviceDataController extends BaseController {
@ApiOperation("询问云前置实时数据")
@ApiImplicitParams({
@ApiImplicitParam(name = "devId", value = "装置id"),
@ApiImplicitParam(name = "lineId", value = "监测点id")
@ApiImplicitParam(name = "lineId", value = "监测点id"),
@ApiImplicitParam(name = "nodeId", value = "前置id")
})
public HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId){
public HttpResult<String> askCldRealData(@RequestParam("devId") String devId, @RequestParam("lineId") String lineId, @RequestParam("nodeId") String nodeId){
String methodDescribe = getMethodDescribe("askCldRealData");
askDeviceDataService.askCldRealData(devId,lineId);
askDeviceDataService.askCldRealData(devId,lineId,nodeId);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, null, methodDescribe);
}

View File

@@ -19,6 +19,7 @@ import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.pojo.dto.BaseRealDataSet;
import com.njcn.user.api.AppUserFeignClient;
@@ -120,6 +121,13 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
});
}
}
//云前置设备心跳丢失处理
if(expiredKey.startsWith(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey())){
String node = expiredKey.split(":")[1];
String nodeId = node.substring(0, node.length() - 1);
int processNo = Integer.parseInt(node.substring(node.length() - 1));
equipmentFeignClient.updateCldDevStatus(nodeId,processNo);
}
}
//主任务

View File

@@ -22,5 +22,5 @@ public interface AskDeviceDataService {
*/
void askRealData(String nDid, Integer idx, Integer size);
void askCldRealData(String devId, String lineId);
void askCldRealData(String devId, String lineId, String nodeId);
}

View File

@@ -213,14 +213,15 @@ public class AskDeviceDataServiceImpl implements AskDeviceDataService {
}
@Override
public void askCldRealData(String devId, String lineId) {
public void askCldRealData(String devId, String lineId, String nodeId) {
RealDataMessage realDataMessage = new RealDataMessage();
realDataMessage.setDevSeries(devId);
realDataMessage.setLine(lineId);
int lastDigit = Character.getNumericValue(lineId.charAt(lineId.length() - 1));
realDataMessage.setLine(lastDigit);
realDataMessage.setRealData(true);
realDataMessage.setSoeData(true);
realDataMessage.setLimit(20);
realDataMessageTemplate.sendMember(realDataMessage);
realDataMessageTemplate.sendMember(realDataMessage,nodeId);
}
public Object getDeviceMid(String nDid) {

View File

@@ -849,12 +849,18 @@ public class CsDevModelServiceImpl implements ICsDevModelService {
* 解析数据集、详细数据
*/
private void analysisDataSet(TemplateDto templateDto,String pId){
String code;
List<CsDataSet> setList = new ArrayList<>();
List<CsDataArray> arrayList = new ArrayList<>();
List<DataSetDto> dataSetList = templateDto.getDataSet();
String devType = templateDto.getDevType();
DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData();
String code = dictTreeFeignClient.queryById(dictTreeVO.getPid()).getData().getCode();
if (!DicDataEnum.DEV_CLD.getCode().equals(devType)){
DictTreeVO dictTreeVO = dictTreeFeignClient.queryByCode(devType).getData();
code = dictTreeFeignClient.queryById(dictTreeVO.getPid()).getData().getCode();
} else {
code = null;
}
//逻辑设备录入
if (CollectionUtil.isNotEmpty(dataSetList)){
dataSetList.forEach(item1->{

View File

@@ -371,7 +371,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
@Override
// @Transactional(rollbackFor = Exception.class)
@Transactional(rollbackFor = Exception.class)
public String wlDevRegister(String nDid) {
String result = "fail";
// 设备状态判断
@@ -404,7 +404,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
Thread.sleep(2000);
List<CsModelDto> modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class);
if (CollUtil.isEmpty(modelList)) {
throwExceptionAndLog(AccessResponseEnum.MODEL_ERROR, logDto);
throwExceptionAndLog(nDid,AccessResponseEnum.MODEL_ERROR, logDto);
}
List<CsDataSet> list = csDataSetService.getDataSetData(modelList.get(0).getModelId());
list.forEach(item->{
@@ -459,6 +459,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
csLogsFeignClient.addUserLog(logDto);
resetFactory(nDid);
throw new BusinessException(AccessResponseEnum.ACCESS_ERROR);
}
return result;
@@ -478,18 +479,18 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
DeviceLogDTO logDto = createLogDto("当前设备"+nDid+"状态判断");
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);
if (Objects.isNull(csEquipmentDeliveryVO.getNdid())) {
throwExceptionAndLog(AccessResponseEnum.NDID_NO_FIND, logDto);
throwExceptionAndLog(nDid,AccessResponseEnum.NDID_NO_FIND, logDto);
}
SysDicTreePO sysDicTreePo = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevType()).getData();
if (Objects.isNull(sysDicTreePo)) {
throwExceptionAndLog(AccessResponseEnum.DEV_NOT_FIND, logDto);
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_NOT_FIND, logDto);
}
String code = sysDicTreePo.getCode();
if (!Objects.equals(code, DicDataEnum.PORTABLE.getCode())) {
throwExceptionAndLog(AccessResponseEnum.DEV_IS_NOT_PORTABLE, logDto);
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_IS_NOT_PORTABLE, logDto);
}
if (!mqttUtil.judgeClientOnline("NJCN-" + nDid.substring(nDid.length() - 6))) {
throwExceptionAndLog(AccessResponseEnum.MISSING_CLIENT, logDto);
throwExceptionAndLog(nDid,AccessResponseEnum.MISSING_CLIENT, logDto);
}
}
@@ -503,7 +504,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);
SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData();
if (Objects.isNull(dictData)) {
throwExceptionAndLog(AccessResponseEnum.DEV_MODEL_NOT_FIND, logDto);
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_MODEL_NOT_FIND, logDto);
}
String devModel = dictData.getCode();
zhiLianRegister(nDid,devModel);
@@ -518,10 +519,11 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
return logDto;
}
private void throwExceptionAndLog(AccessResponseEnum responseEnum, DeviceLogDTO logDto) {
private void throwExceptionAndLog(String nDid,AccessResponseEnum responseEnum, DeviceLogDTO logDto) {
logDto.setResult(0);
logDto.setFailReason(responseEnum.getMessage());
csLogsFeignClient.addUserLog(logDto);
resetFactory(nDid);
throw new BusinessException(responseEnum);
}

View File

@@ -1,49 +1,37 @@
package com.njcn;
import static org.junit.Assert.assertTrue;
import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import cn.hutool.core.collection.CollUtil;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.graphbuilder.math.func.EFunction;
import com.njcn.access.AccessBootApplication;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.access.pojo.dto.mqtt.MqttClientDto;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.ICsTopicService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.oss.utils.FileStorageUtil;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import io.lettuce.core.protocol.CompleteableCommand;
import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.enums.DicDataEnum;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.eclipse.paho.client.mqttv3.*;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import static org.junit.Assert.assertTrue;
/**
* Unit test for simple App.
@@ -81,72 +69,410 @@ public class AppTest
private MqttUtil mqttUtil;
@Test
public void lossTest() {
final int[] mid = {2};
for (int i = 0; i < 2; i++) {
mid[0] = mid[0] + 1;
public void deleteRedis() {
redisUtil.deleteKeysByString("devModelKey:00B78DA800B011avg");
}
ScheduledFuture<?> runnableFuture = null;
@Resource
private ICsTopicService csTopicService;
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ExecutorService executor = Executors.newFixedThreadPool(10);
private static final long ACCESS_TIME = 20L;
@Resource
private DictTreeFeignClient dictTreeFeignClient;
/**
* 测试下载文件
*/
@Test
public void run1() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
System.out.println("轮询定时任务执行中!");
};
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
}
@Test
public void run() {
Runnable task = () -> {
log.info("轮询定时任务执行中!");
CsEquipmentDeliveryPO po = new CsEquipmentDeliveryPO();
po.setNdid("00B78DA80103");
po.setDevType("8b45cf6b7f5266e777d07c166ad5fa77");
po.setStatus(2);
List<CsEquipmentDeliveryPO> list = Collections.singletonList(po);
if (CollUtil.isNotEmpty(list)) {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 将任务平均分配给10个子列表
List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>();
int partitionSize = list.size() / 10;
for (int i = 0; i < 10; i++) {
int start = i * partitionSize;
int end = (i == 9) ? list.size() : start + partitionSize;
subLists.add(list.subList(start, end));
}
// 创建一个ExecutorService来处理这些任务
List<Future<Void>> futures = new ArrayList<>();
// 提交任务给线程池执行
for (int i = 0; i < 10; i++) {
int index = i;
futures.add(executor.submit(new Callable<Void>() {
@Override
public Void call() {
// accessDev(subLists.get(index));
System.out.println("123");
return null;
}
}));
}
// 等待所有任务完成
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
// 关闭ExecutorService
executor.shutdown();
}
};
//第一次执行的时间为120s然后每隔120s执行一次
scheduler.scheduleAtFixedRate(task,0,20,TimeUnit.SECONDS);
}
public void accessDev(List<CsEquipmentDeliveryPO> list) {
if (CollUtil.isNotEmpty(list)) {
list.forEach(item->{
System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid());
//判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入
String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode();
if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) {
//csDeviceService.wlDevRegister(item.getNdid());
log.info("请先手动注册、接入");
} else {
String version = csTopicService.getVersion(item.getNdid());
if (Objects.isNull(version)) {
version = "V1";
}
csDeviceService.devAccessAskTemplate(item.getNdid(),version,1);
}
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1);
});
}
System.out.println("mid==:" + mid[0]);
}
@Test
public void test1() {
String clientName = "NJCN-A801C8";
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
System.out.println("mqttClient==:" + mqttClient);
}
@Test
public void test() {
// ReqAndResDto reqAndResParam = new ReqAndResDto();
// reqAndResParam.setMid(1);
// reqAndResParam.setDid(0);
// reqAndResParam.setPri(AccessEnum.FIRST_CHANNEL.getCode());
// reqAndResParam.setType(4866);
// publisher.send("/Dev/Data1/V1/123", new Gson().toJson(reqAndResParam),1,false);
// String key = String.valueOf(IdUtil.getSnowflake().nextId());
// System.out.println("key==:" + key);
// List<CsLinePO> csLinePoList = new ArrayList<>();
// CsLinePO po1 = new CsLinePO();
// po1.setPosition("1");
// CsLinePO po2= new CsLinePO();
// po2.setPosition("2");
// CsLinePO po3= new CsLinePO();
// po3.setPosition("3");
// CsLinePO po4= new CsLinePO();
// po4.setPosition("1");
// @Test
// public void lossTest() {
// final int[] mid = {2};
// for (int i = 0; i < 2; i++) {
// mid[0] = mid[0] + 1;
// }
// System.out.println("mid==:" + mid[0]);
// }
//
// csLinePoList.add(po1);
// csLinePoList.add(po2);
// csLinePoList.add(po3);
// csLinePoList.add(po4);
// List<String> l = csLinePoList.stream().map(CsLinePO::getPosition).collect(Collectors.toList());
// System.out.println("l===:" + l);
// List<String> lineList = l.stream().filter(e-> Collections.frequency(l,e) > 1).distinct().collect(Collectors.toList());
// System.out.println("lineList==:" + lineList);
// @Test
// public void test1() {
// String clientName = "NJCN-016AB3";
// boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
// System.out.println("mqttClient==:" + mqttClient);
// }
//
// @Test
// public void testAutoAccess() {
// List<CsEquipmentDeliveryPO> list = new ArrayList<>();
// //项目启动60s后发起自动接入
// Runnable task = () -> {
// long time1 = System.currentTimeMillis();
// List<CsEquipmentDeliveryPO> list1 = csEquipmentDeliveryService.getAll();
// for (int i = 0; i < 100; i++) {
// list.addAll(list1);
// }
// if (CollUtil.isNotEmpty(list)) {
// // 将任务平均分配给10个子列表
// List<List<CsEquipmentDeliveryPO>> subLists = new ArrayList<>();
// int partitionSize = list.size() / 10;
// for (int i = 0; i < 10; i++) {
// int start = i * partitionSize;
// int end = (i == 9) ? list.size() : start + partitionSize;
// subLists.add(list.subList(start, end));
// }
//
// // 创建一个ExecutorService来处理这些任务
// List<Future<Void>> futures = new ArrayList<>();
// // 提交任务给线程池执行
// for (int i = 0; i < 10; i++) {
// int index = i;
// futures.add(executor.submit(new Callable<Void>() {
// @Override
// public Void call() throws Exception {
// accessDev(subLists.get(index));
// return null;
// }
// }));
// }
// // 等待所有任务完成
// for (Future<Void> future : futures) {
// try {
// future.get();
// } catch (InterruptedException | ExecutionException e) {
// throw new RuntimeException(e);
// }
// }
// // 关闭ExecutorService
// executor.shutdown();
// scheduler.shutdown();
// }
// long time2 = System.currentTimeMillis();
// System.out.println("执行时间==" + (time2 - time1));
// };
// scheduler.schedule(task, ACCESS_TIME, TimeUnit.SECONDS);
// }
// String text = "TkosUFEsMTk5OQ0KNiw2QSwwRA0KMSxBz+C159G5LEEstefRuSxWLDAuMDYyMjU2LDAuMDAwMDAwLDAuMDAwMDAwLC0zMjc2NywzMjc2NywzODAsMzgwLFMNCjIsQs/gtefRuSxCLLXn0bksViwwLjA2MjI1NiwwLjAwMDAwMCwwLjAwMDAwMCwtMzI3NjcsMzI3NjcsMzgwLDM4MCxTDQozLEPP4LXn0bksQyy159G5LFYsMC4wNjIyNTYsMC4wMDAwMDAsMC4wMDAwMDAsLTMyNzY3LDMyNzY3LDM4MCwzODAsUw0KNCxBz+C158H3LEEstefB9yxBLDAuMDE1MjU5LDAuMDAwMDAwLDAuMDAwMDAwLC0zMjc2NywzMjc2NywyMDAsNSxTDQo1LELP4LXnwfcsQiy158H3LEEsMC4wMTUyNTksMC4wMDAwMDAsMC4wMDAwMDAsLTMyNzY3LDMyNzY3LDIwMCw1LFMNCjYsQ8/gtefB9yxDLLXnwfcsQSwwLjAxNTI1OSwwLjAwMDAwMCwwLjAwMDAwMCwtMzI3NjcsMzI3NjcsMjAwLDUsUw0KNTANCjENCjEyODAwLDcxNjgNCjA1LzA5LzIwMjMsMTU6NTQ6MDIuMTM2MDAwDQowNS8wOS8yMDIzLDE1OjU0OjAyLjIzNjAwMA0KQklOQVJZDQoxDQo=";
// byte[] byteArray = Base64.getDecoder().decode(text);
// InputStream inputStream = new ByteArrayInputStream(byteArray);
// fileStorageUtil.uploadStreamSpecifyName(inputStream, "configuration/","xuyang.cfg");
// public void accessDev(List<CsEquipmentDeliveryPO> list) {
// list.forEach(item->{
// try {
// System.out.println(Thread.currentThread().getName() + ": processing data " + item.getNdid());
// Thread.sleep(2000);
// String version = csTopicService.getVersion(item.getNdid());
// if (!Objects.isNull(version)){
// csDeviceService.devAccessAskTemplate(item.getNdid(),version,1);
// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1);
// }
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// });
// }
// @Test
// @After
// public void test() {
// String nDid = "00B78D016AB5";
// String version = "V1";
// try {
// inputStream.close();
// } catch (IOException e) {
//// //装置没有心跳,则立马发起接入请求
//// csDeviceService.devAccessAskTemplate(nDid,version,1);
//// log.info("装置掉线3分钟发送接入请求");
//// Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
//// if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
//// throw new BusinessException(CommonResponseEnum.SUCCESS);
//// }
//// //心跳断连立马发起接入失败后1分钟再次发起请求请求3次
//// for (int i = 2; i < 5; i++) {
//// //接入再次失败,则定时发起接入请求
//// Thread.sleep(1000 * 6);
//// csDeviceService.devAccessAskTemplate(nDid,version,i);
//// status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
//// if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
//// break;
//// }
//// log.info("装置定时1分钟发送接入请求第" + i + "次尝试");
//// }
// Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
// if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){
// final int[] mid = {5};
// runnableFuture = executor.scheduleAtFixedRate(() -> {
// csDeviceService.devAccessAskTemplate(nDid,version, mid[0]);
// Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
// if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){
// runnableFuture.cancel(false);
// } else {
// mid[0] = mid[0] + 1;
// }
// //记录日志
// log.info("装置掉线,定时10分钟发送接入请求装置为:" + nDid + ",请求的时间戳为:" + System.currentTimeMillis());
// }, 1, 1, TimeUnit.SECONDS);
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// @Test
// @After
// public void test2() {
// String nDid = "00B78D016AB5";
// String version = "V1";
// try {
// //装置没有心跳,则立马发起接入请求
// csDeviceService.devAccessAskTemplate(nDid,version,1);
// log.info("装置掉线3分钟发送接入请求");
// Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
// if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
// throw new BusinessException(CommonResponseEnum.SUCCESS);
// }
// //心跳断连立马发起接入失败后1分钟再次发起请求请求3次
// for (int i = 2; i < 5; i++) {
// //接入再次失败,则定时发起接入请求
// Thread.sleep(1000 * 6);
// csDeviceService.devAccessAskTemplate(nDid,version,i);
// status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
// if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
// break;
// }
// log.info("装置定时1分钟发送接入请求第" + i + "次尝试");
// }
// if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){
// final int[] mid = {5};
// ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
// runnableFuture = executor.scheduleAtFixedRate(() -> {
// csDeviceService.devAccessAskTemplate(nDid,version, mid[0]);
// Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
// if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){
// runnableFuture.cancel(true);
// } else {
// mid[0] = mid[0] + 1;
// }
// //记录日志
// log.info("装置掉线,定时10分钟发送接入请求装置为:" + nDid + ",请求的时间戳为:" + System.currentTimeMillis());
// }, 1, 1, TimeUnit.SECONDS);
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//
// @Test
// @After
// public void testDeviceAccess() {
// String nDid = "00B78D016AB5";
// String version = "V1";
// try {
// // 初次接入请求
// initiateDeviceAccess(nDid, version, 1);
// // 检查设备状态
// Integer status = checkDeviceStatus(nDid);
// if (status != null && Objects.equals(status, AccessEnum.ONLINE.getCode())) {
// throw new BusinessException(CommonResponseEnum.SUCCESS);
// }
// // 重试接入请求最多尝试3次
// attemptReconnect(nDid, version);
// // 如果设备仍然离线,开始定时任务发起接入请求
// if (status != null && Objects.equals(status, AccessEnum.OFFLINE.getCode())) {
// startScheduledReconnection(nDid, version);
// }
// } catch (Exception e) {
// log.error("Device access error", e);
// }
// }
//
// private static final String BROKER_URL = "tcp://192.168.1.27:1885";
// private static final String CLIENT_ID = "JavaAsyncPublisher";
// private static final int QOS = 1; // Quality of Service
// private static final int NUM_DEVICES = 10;
// private static final String TOPIC_PREFIX = "/Dev/Data/V1/";
// private static final int DEV_NUMS = 20;
//
// @Test
// public void test11() {
// MqttClient client = null;
// ExecutorService executor = Executors.newFixedThreadPool(NUM_DEVICES);
//
// try {
// client = new MqttClient(BROKER_URL, CLIENT_ID);
// MqttConnectOptions options = new MqttConnectOptions();
// options.setCleanSession(true);
// client.connect(options);
//
// client.setCallback(new MqttCallback() {
// @Override
// public void connectionLost(Throwable cause) {
// // Handle connection loss
// }
//
// @Override
// public void messageArrived(String topic, MqttMessage message) throws Exception {
// // Handle incoming messages (not used in this example)
// }
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken token) {
// // Handle delivery completion
// System.out.println("Message delivery completed for token: " + token.isComplete());
// }
// });
//
// // Submit tasks to the executor service to send messages to each device
// for (int i = 1; i <= DEV_NUMS; i++) {
// final String deviceId = "00B78DA8000" + i;
// MqttClient finalClient = client;
// executor.submit(() -> {
// try {
// String topic = TOPIC_PREFIX + deviceId;
// String payload = "Message for device " + deviceId;
// MqttMessage message = new MqttMessage(payload.getBytes());
// message.setQos(QOS);
// finalClient.publish(topic, message);
// System.out.println("Sent message to topic: " + topic + " Message: " + payload);
// } catch (MqttException e) {
// e.printStackTrace();
// }
// });
// }
// } catch (MqttException e) {
// e.printStackTrace();
// } finally {
// if (client != null && client.isConnected()) {
// try {
// client.disconnect();
// } catch (MqttException e) {
// e.printStackTrace();
// }
// }
// }
// }
//
// private void initiateDeviceAccess(String nDid, String version, int attempt) {
// csDeviceService.devAccessAskTemplate(nDid, version, attempt);
// log.info("装置掉线3分钟发送接入请求");
// }
//
// private Integer checkDeviceStatus(String nDid) {
// return csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
// }
//
// private void attemptReconnect(String nDid, String version) throws InterruptedException {
// for (int i = 2; i < 5; i++) {
// Thread.sleep(1000 * 6); // 每 6 秒重试一次
// initiateDeviceAccess(nDid, version, i);
//
// Integer status = checkDeviceStatus(nDid);
// if (status != null && Objects.equals(status, AccessEnum.ONLINE.getCode())) {
// break;
// }
// log.info("装置定时1分钟发送接入请求第" + i + "次尝试");
// }
// }
//
// private void startScheduledReconnection(String nDid, String version) {
// final int[] attemptCounter = {5};
// ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
//
// Runnable reconnectTask = () -> {
// initiateDeviceAccess(nDid, version, attemptCounter[0]);
//
// Integer status = checkDeviceStatus(nDid);
// if (status != null && Objects.equals(status, AccessEnum.ONLINE.getCode())) {
// executor.shutdown(); // 关闭调度器
// } else {
// attemptCounter[0]++;
// }
// log.info("装置掉线,定时10分钟发送接入请求装置为:" + nDid
// + ",请求的时间戳为:" + System.currentTimeMillis());
// };
//
// executor.scheduleAtFixedRate(reconnectTask, 1, 1, TimeUnit.SECONDS);
// }
// 要计算CRC32的数据
String data = "Hello, World!";
CRC32 crc32 = new CRC32();
crc32.update(data.getBytes());
long crc32Value = crc32.getValue();
// 将CRC32校验值转换为16进制字符串
String crc32Str = String.format("%08X", crc32Value);
System.out.println("CRC32校验值为: " + crc32Str);
}
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,61 @@
package com.njcn;
import cn.hutool.core.collection.CollUtil;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@Component
@Slf4j
@RequiredArgsConstructor
public class TestXianCheng {
private static final long AUTO_TIME = 120L;
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
log.info("轮询定时任务执行中!");
ExecutorService executor = Executors.newFixedThreadPool(10);
// 创建一个ExecutorService来处理这些任务
List<Future<Void>> futures = new ArrayList<>();
// 提交任务给线程池执行
for (int i = 0; i < 10; i++) {
int index = i;
futures.add(executor.submit(new Callable<Void>() {
@Override
public Void call() {
access();
return null;
}
}));
}
// 等待所有任务完成
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
// 关闭ExecutorService
executor.shutdown();
};
//第一次执行的时间为120s然后每隔120s执行一次
scheduler.scheduleAtFixedRate(task,0,1,TimeUnit.SECONDS);
}
public static void access() {
System.out.println("123");
}
}

View File

@@ -88,7 +88,7 @@ public class RtServiceImpl implements IRtService {
long timestamp = item.getDataTimeSec() - 8*3600;
baseRealDataSet.setDataTime(getTime(timestamp));
publisher.send("/Web/RealData/" + lineId, new Gson().toJson(baseRealDataSet), 1, false);
} else if (dataSet.getName().contains("实时数据集合")) {
} else if (dataSet.getName().contains("实时数据")) {
//用户Id
Object redisObject = redisUtil.getObjectByKey("rtDataUserId:"+lineId);
if (ObjectUtil.isNotNull(redisObject)) {

View File

@@ -101,6 +101,7 @@ public class StatServiceImpl implements IStatService {
//云前置设备
else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) {
lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
}
//获取当前设备信息
@@ -127,7 +128,8 @@ public class StatServiceImpl implements IStatService {
default:
break;
}
String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + dataArrayParam.getCldId() + dataArrayParam.getStatMethod() + dataArrayParam.getIdx());
int clDid = Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)?1:appAutoDataMessage.getMsg().getClDid();
String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getStatMethod() + dataArrayParam.getIdx());
Object object = redisUtil.getObjectByKey(key);
List<CsDataArray> dataArrayList;
if (Objects.isNull(object)){
@@ -135,10 +137,11 @@ public class StatServiceImpl implements IStatService {
} else {
dataArrayList = objectToList(object);
}
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess());
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code);
recordList.addAll(result);
//获取时间
time = Instant.ofEpochSecond(item.getDataTimeSec()-8*3600)
long devTime = Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)?item.getDataTimeSec():item.getDataTimeSec()-8*3600;
time = Instant.ofEpochSecond(devTime)
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
}
@@ -196,7 +199,7 @@ public class StatServiceImpl implements IStatService {
/**
* influxDB数据组装
*/
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process) {
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType) {
List<String> records = new ArrayList<String>();
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
@@ -217,10 +220,11 @@ public class StatServiceImpl implements IStatService {
tags.put(InfluxDBTableConstant.CL_DID,clDid.toString());
tags.put(InfluxDBTableConstant.PROCESS,process.toString());
Map<String,Object> fields = new HashMap<>();
fields.put(dataArrayList.get(i).getName(),floats.get(i));
//这边特殊处理如果数据为3.14159则将数据置为null
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时。
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
Point point = influxDbUtils.pointBuilder(tableName, Objects.equals(DicDataEnum.DEV_CLD.getCode(),devType)?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());

View File

@@ -2,6 +2,10 @@ package com.njcn;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.enums.TypeEnum;
import com.njcn.access.pojo.dto.ReqAndResDto;
import com.njcn.csharmonic.api.WavePicFeignClient;
import com.njcn.influx.pojo.constant.InfluxDBTableConstant;
import com.njcn.influx.utils.InfluxDbUtils;
@@ -15,6 +19,8 @@ import com.njcn.zlevent.ZlEventBootApplication;
import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.dto.FileStreamDto;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveService;
import net.sf.json.JSONObject;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
@@ -59,6 +65,11 @@ public class AppTest {
@Resource
private InfluxDbUtils influxDbUtils;
@Resource
private MqttPublisher publisher;
@Resource
private ICsWaveService csWaveService;
/**
* Rigorous Test :-)
*/
@@ -68,6 +79,39 @@ public class AppTest {
assertTrue( true );
}
@Test
public void test00() {
long time = 1726237055L;
long subtleTime = 889000L;
Double millisecond = 1430.0;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
time = time - 8*3600;
// 将millisecond转换为长整型并乘以1000以获取微秒
long millisecondValue = millisecond.longValue() * 1000;
long time1 = time * 1000000 + subtleTime;
long time2 = time * 1000000 + subtleTime + millisecondValue;
String time1String = String.valueOf(time1);
String time2String = String.valueOf(time2);
String time11 = time1String.substring(time1String.length() - 6);
String time111 = time1String.substring(0,time1String.length() - 6);
String formatTime1 = format.format(Long.parseLong(time111) * 1000);
String time22 = time2String.substring(time2String.length() - 6);
String time222 = time2String.substring(0,time2String.length() - 6);
String formatTime2 = format.format(Long.parseLong(time222) * 1000);
System.out.println(formatTime1 + "." + time11);
System.out.println(formatTime2 + "." + time22);
}
@Test
public void test11() {
String fileName = "/bd0/comtrade/PQS_PQM1_000063_20241029_101442_886";
boolean result = csWaveService.findCountByName(fileName);
System.out.println("result==:" + result);
}
@Test
public void test3() {
List<String> records = new ArrayList<String>();

View File

@@ -81,6 +81,17 @@
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>cs-device-api</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.njcn</groupId>
<artifactId>common-device-biz</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,135 @@
package com.njcn.message.consumer;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.CldDeviceRunFlagMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 类的介绍:云前置状态反转
*
* @author xuyang
* @version 1.0.0
* @createTime 2025/9/16
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
consumerGroup = BusinessTopic.DEVICE_RUN_FLAG_TOPIC,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class CldDevRunFlagConsumer extends EnhanceConsumerMessageHandler<CldDeviceRunFlagMessage> implements RocketMQListener<CldDeviceRunFlagMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Resource
private EquipmentFeignClient equipmentFeignClient;
@Override
protected void handleMessage(CldDeviceRunFlagMessage cldDeviceRunFlagMessage) {
log.info("分发至翻转设备状态");
int status = Objects.equals(cldDeviceRunFlagMessage.getStatus(),"0") ? 1 : 2;
equipmentFeignClient.flipCldDevStatus(cldDeviceRunFlagMessage.getId(), status);
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(CldDeviceRunFlagMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(CldDeviceRunFlagMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(CldDeviceRunFlagMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(CldDeviceRunFlagMessage appAutoDataMessage) {
super.dispatchMessage(appAutoDataMessage);
}
}

View File

@@ -0,0 +1,132 @@
package com.njcn.message.consumer;
import com.njcn.middle.rocket.constant.EnhanceMessageConstant;
import com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler;
import com.njcn.mq.constant.BusinessTopic;
import com.njcn.mq.constant.MessageStatus;
import com.njcn.mq.message.CldHeartBeatMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.pojo.enums.RedisKeyEnum;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.RocketMqLogFeignClient;
import com.njcn.system.pojo.po.RocketmqMsgErrorLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 类的介绍: 云前置心跳消息
*
* @author xuyang
* @version 1.0.0
* @createTime 2025/9/16
*/
@Service
@RocketMQMessageListener(
topic = BusinessTopic.HEART_BEAT_TOPIC,
consumerGroup = BusinessTopic.HEART_BEAT_TOPIC,
consumeThreadNumber = 10,
enableMsgTrace = true
)
@Slf4j
public class CldHeartConsumer extends EnhanceConsumerMessageHandler<CldHeartBeatMessage> implements RocketMQListener<CldHeartBeatMessage> {
@Resource
private RedisUtil redisUtil;
@Resource
private RocketMqLogFeignClient rocketMqLogFeignClient;
@Override
protected void handleMessage(CldHeartBeatMessage cldHeartBeatMessage) {
log.info("分发至云前置心跳数据");
String key = RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey().concat(cldHeartBeatMessage.getNodeId() + cldHeartBeatMessage.getProcessNo());
redisUtil.saveByKeyWithExpire(key, cldHeartBeatMessage.getNodeId() + cldHeartBeatMessage.getProcessNo(), RedisKeyEnum.CLD_HEART_BEAT_KEY.getTime());
}
/***
* 通过redis分布式锁判断当前消息所处状态
* 1、null 查不到该key的数据属于第一次消费放行
* 2、fail 上次消息消费时发生异常,放行
* 3、being processed 正在处理,打回去
* 4、success 最近72小时消费成功避免重复消费打回去
*/
@Override
public boolean filter(CldHeartBeatMessage message) {
String keyStatus = redisUtil.getStringByKey(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()));
if (Objects.isNull(keyStatus) || keyStatus.equalsIgnoreCase(MessageStatus.FAIL)) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.BEING_PROCESSED, 60L);
return false;
}
return true;
}
/**
* 消费成功缓存到redis72小时避免重复消费
*/
@Override
protected void consumeSuccess(CldHeartBeatMessage message) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.SUCCESS, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
}
/**
* 发生异常时,进行错误信息入库保存
* 默认没有实现类子类可以实现该方法调用feign接口进行入库保存
*/
@Override
protected void saveExceptionMsgLog(CldHeartBeatMessage message, String identity, Exception exception) {
redisUtil.saveByKeyWithExpire(AppRedisKey.RMQ_CONSUME_KEY.concat(message.getKey()), MessageStatus.FAIL, RedisKeyEnum.ROCKET_MQ_KEY.getTime());
RocketmqMsgErrorLog rocketmqMsgErrorLog = new RocketmqMsgErrorLog();
rocketmqMsgErrorLog.setMsgKey(message.getKey());
rocketmqMsgErrorLog.setResource(message.getSource());
if (identity.equalsIgnoreCase(EnhanceMessageConstant.IDENTITY_SINGLE)) {
//数据库字段配置长度200避免插入失败大致分析异常原因
String exceptionMsg = exception.getMessage();
if(exceptionMsg.length() > 200){
exceptionMsg = exceptionMsg.substring(0,180);
}
rocketmqMsgErrorLog.setRecord(exceptionMsg);
//如果是当前消息重试的则略过
if(!message.getSource().startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
//单次消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
} else {
rocketmqMsgErrorLog.setRecord("重试消费" + super.getMaxRetryTimes() + "次,依旧消费失败。");
//重试N次后依然消费异常
rocketMqLogFeignClient.add(rocketmqMsgErrorLog);
}
}
/***
* 处理失败后,是否重试
* 一般开启
*/
@Override
protected boolean isRetry() {
return true;
}
/***
* 消费失败是否抛出异常,抛出异常后就不再消费了
*/
@Override
protected boolean throwException() {
return false;
}
/***
* 调用父类handler处理消息的元信息
*/
@Override
public void onMessage(CldHeartBeatMessage cldHeartBeatMessage) {
super.dispatchMessage(cldHeartBeatMessage);
}
}