From 2cad107c294363e7324b81da26f5b0c60b60b03b Mon Sep 17 00:00:00 2001 From: xy <748613696@qq.com> Date: Fri, 8 May 2026 16:30:41 +0800 Subject: [PATCH] =?UTF-8?q?feat(timer):=20=E6=B7=BB=E5=8A=A0MQTT=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E6=A3=80=E6=B5=8B=E5=AE=9A=E6=97=B6=E5=99=A8=E5=B9=B6?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增MqttHeartCheckTimer定时器,每小时轮询在线设备并进行心跳检测 - 在CsEquipmentDeliveryService中添加getUseOnlineDevice方法获取启用且在线的MQTT设备 --- .../listener/RedisKeyExpirationListener.java | 33 --- .../njcn/access/runner/AutoAccessTimer.java | 70 ------- .../access/runner/MqttHeartCheckTimer.java | 190 ++++++++++++++++++ .../service/ICsEquipmentDeliveryService.java | 5 + .../impl/CsEquipmentDeliveryServiceImpl.java | 14 +- .../stat/service/impl/StatServiceImpl.java | 69 ++++++- 6 files changed, 266 insertions(+), 115 deletions(-) create mode 100644 iot-access/access-boot/src/main/java/com/njcn/access/runner/MqttHeartCheckTimer.java diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java index ad3450b..24e57b6 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/listener/RedisKeyExpirationListener.java @@ -7,10 +7,7 @@ import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.enums.AccessEnum; import com.njcn.access.pojo.dto.NoticeUserDto; -import com.njcn.access.service.ICsDeviceOnlineLogsService; import com.njcn.access.service.ICsEquipmentDeliveryService; -import com.njcn.access.service.impl.CsDeviceServiceImpl; -import com.njcn.access.utils.MqttUtil; import com.njcn.access.utils.RedisSetUtil; import com.njcn.access.utils.SendMessageUtil; import com.njcn.common.pojo.dto.DeviceLogDTO; @@ -22,9 +19,6 @@ import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.rt.pojo.dto.BaseRealDataSet; -import com.njcn.user.api.AppInfoSetFeignClient; -import com.njcn.user.api.AppUserFeignClient; -import com.njcn.user.api.UserFeignClient; import com.njcn.user.pojo.po.User; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -53,24 +47,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene @Resource private ICsEquipmentDeliveryService csEquipmentDeliveryService; @Resource - private CsDeviceServiceImpl csDeviceService; - @Resource private CsLogsFeignClient csLogsFeignClient; @Resource - private ICsDeviceOnlineLogsService onlineLogsService; - @Resource - private MqttUtil mqttUtil; - @Resource private CsLedgerFeignClient csLedgerFeignclient; @Resource private EquipmentFeignClient equipmentFeignClient; @Resource - private AppUserFeignClient appUserFeignClient; - @Resource - private CsDeviceUserFeignClient csDeviceUserFeignClient; - @Resource - private UserFeignClient userFeignClient; - @Resource private RedisUtil redisUtil; @Resource private SendMessageUtil sendMessageUtil; @@ -81,20 +63,12 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene @Resource private RedisSetUtil redisSetUtil; @Resource - private AppInfoSetFeignClient appInfoSetFeignClient; - @Resource private DeviceMessageFeignClient deviceMessageFeignClient; - - private final Object lock = new Object(); - public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } - //最大告警次数 - private static int MAX_WARNING_TIMES = 0; - /** * 针对redis数据失效事件,进行数据处理 * 注意message.toString()可以获取失效的key @@ -124,13 +98,6 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene }); } } - //云前置设备心跳丢失处理 -// if(expiredKey.startsWith(RedisKeyEnum.CLD_HEART_BEAT_KEY.getKey())){ -// String node = expiredKey.split(":")[1]; -// String nodeId = node.substring(0, node.length() - 1); -// int processNo = Integer.parseInt(node.substring(node.length() - 1)); -// equipmentFeignClient.updateCldDevStatus(nodeId,processNo); -// } } //主任务 diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java index 97852ad..d8c166e 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/AutoAccessTimer.java @@ -161,74 +161,4 @@ public class AutoAccessTimer implements ApplicationRunner { } } -// @Override -// public void run(ApplicationArguments args) { -// if (scheduler.isShutdown() || scheduler.isTerminated()) { -// scheduler = Executors.newScheduledThreadPool(1); -// } -// Runnable task = () -> { -// log.info("轮询定时任务执行中!"); -// List list = csEquipmentDeliveryService.getOfflineDev(); -// if (CollUtil.isNotEmpty(list)) { -// ExecutorService executor = Executors.newFixedThreadPool(10); -// // 将任务平均分配给10个子列表 -// List> subLists = CollUtil.split(list, 10); -// // 创建一个ExecutorService来处理这些任务 -// List> futures = new ArrayList<>(); -// for (List subList : subLists) { -// futures.add(executor.submit(() -> { -// try { -// accessDev(subList); -// } catch (Exception e) { -// log.error("处理设备子列表异常,但继续处理其他任务", e); -// } -// return null; -// })); -// } -// // 等待所有任务完成 -// for (Future future : futures) { -// try { -// future.get(); -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// log.error("任务被中断", e); -// } catch (ExecutionException e) { -// log.error("任务执行异常", e.getCause()); -// } catch (Exception e) { -// log.error("系统异常", e.getCause()); -// } -// } -// // 关闭ExecutorService -// executor.shutdown(); -// } -// }; -// //第一次执行的时间为120s,然后在前一个任务执行完毕后,等待120s再执行下一个任务 -// scheduler.scheduleWithFixedDelay(task, AUTO_TIME, AUTO_TIME, TimeUnit.SECONDS); -// } -// -// public void accessDev(List list) { -// if (CollUtil.isNotEmpty(list)) { -// try { -// list.forEach(item -> { -// System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); -// //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 -// String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); -// if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(), 1)) { -// //csDeviceService.wlDevRegister(item.getNdid()); -// log.info("请先手动注册、接入"); -// } else { -// String version = csTopicService.getVersion(item.getNdid()); -// if (Objects.isNull(version)) { -// version = "V1"; -// } -// csDeviceService.autoAccess(item.getNdid(), version, 1); -// } -// redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(), 1); -// }); -// } catch (Exception e) { -// log.error(e.getMessage()); -// } -// } -// } - } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/runner/MqttHeartCheckTimer.java b/iot-access/access-boot/src/main/java/com/njcn/access/runner/MqttHeartCheckTimer.java new file mode 100644 index 0000000..907d875 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/runner/MqttHeartCheckTimer.java @@ -0,0 +1,190 @@ +package com.njcn.access.runner; + +import cn.hutool.core.collection.CollUtil; +import com.njcn.access.service.ICsEquipmentDeliveryService; +import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; +import com.njcn.redis.utils.RedisUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * @author xy + * 定时1小时轮询设备列表,并使用多线程进行业务处理 + * 1. 定时查询所有设备列表 + * 2. 使用多线程处理每个设备的业务逻辑 + * 3. 处理完成后释放资源 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class MqttHeartCheckTimer implements ApplicationRunner { + + private final ICsEquipmentDeliveryService csEquipmentDeliveryService; + private final RedisUtil redisUtil; + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + // 1小时间隔 + private static final long INTERVAL_MINUTES = 60L; + + + @Override + public void run(ApplicationArguments args) { + if (scheduler.isShutdown() || scheduler.isTerminated()) { + scheduler = Executors.newScheduledThreadPool(1); + } + + Runnable task = () -> { + try { + executeScheduledTask(); + } catch (Throwable t) { + log.error("定时设备处理任务发生严重异常", t); + } + }; + + // 每小时执行一次 + ScheduledFuture future = scheduler.scheduleWithFixedDelay( + task, + 0, + INTERVAL_MINUTES, + TimeUnit.MINUTES + ); + + // 添加监控,如果任务被取消则重新调度 + monitorScheduledTask(future); + } + + /** + * 监控定时任务 + */ + private void monitorScheduledTask(ScheduledFuture future) { + Thread monitorThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + // 每10分钟检查一次任务状态 + Thread.sleep(600000); + if (future.isCancelled() || future.isDone()) { + log.warn("定时设备处理任务被取消或完成,重新调度..."); + // 重新启动任务 + run(null); + break; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("监控线程被中断"); + break; + } catch (Exception e) { + log.error("监控任务异常", e); + } + } + }, "Device-Processor-Monitor-Thread"); + monitorThread.setDaemon(true); + monitorThread.start(); + } + + /** + * 执行定时任务的主要逻辑 + */ + private void executeScheduledTask() { + log.info("开始执行定时设备处理任务(查看在线设备和心跳数据是否一致) - 时间: {}", System.currentTimeMillis()); + + try { + // 查询所有设备列表 + List deviceList = csEquipmentDeliveryService.getUseOnlineDevice(); + + if (CollUtil.isEmpty(deviceList)) { + log.info("设备列表为空,跳过处理"); + return; + } + + log.info("查询到 {} 个设备,开始多线程处理", deviceList.size()); + + // 创建线程池进行多线程处理,根据设备数量动态调整线程数 + // 最大10个线程 + int threadCount = Math.min(deviceList.size(), 10); + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + + try { + // 将设备列表分批处理,每批处理一定数量的设备 + List> batches = CollUtil.split(deviceList, threadCount); + List> futures = new ArrayList<>(); + + for (List batch : batches) { + futures.add(executor.submit(() -> { + try { + processDeviceBatch(batch); + } catch (Exception e) { + log.error("处理设备批次异常", e); + } + return null; + })); + } + + // 等待所有任务完成,设置超时时间防止长时间阻塞 + for (Future future : futures) { + try { + // 设置10分钟超时 + future.get(10, TimeUnit.MINUTES); + } catch (TimeoutException e) { + log.error("设备批次处理超时", e); + } catch (Exception e) { + log.error("设备批次处理异常", e); + } + } + } finally { + executor.shutdown(); + try { + if (!executor.awaitTermination(2, TimeUnit.MINUTES)) { + log.warn("设备处理线程池未在规定时间内关闭,强制关闭"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("等待线程池关闭时被中断"); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + log.info("设备批量处理任务完成 - 时间: {}", System.currentTimeMillis()); + } catch (Exception e) { + log.error("执行设备处理任务异常", e); + } + } + + /** + * 批量处理设备 + */ + private void processDeviceBatch(List deviceBatch) { + if (CollUtil.isEmpty(deviceBatch)) { + return; + } + for (CsEquipmentDeliveryPO device : deviceBatch) { + try { + processSingleDevice(device); + } catch (Exception e) { + log.error("处理单个设备失败: 设备ID={}, 错误={}", device.getNdid(), e.getMessage(), e); + } + } + } + + /** + * 处理单个设备的业务逻辑 + * 注意:这里需要根据实际业务需求实现具体的业务逻辑 + */ + private void processSingleDevice(CsEquipmentDeliveryPO device) { + log.info("正在处理设备: {}", device.getNdid()); + // 1. 检查设备在Redis中的状态 + String deviceKey = "MQTT:" + device.getNdid(); + Object deviceStatus = redisUtil.getObjectByKey(deviceKey); + // 2. 如果没有心跳,则模拟补充个心跳 + if (deviceStatus == null) { + // 如果Redis中没有该设备的状态信息,可以设置默认值或执行相应处理 + redisUtil.saveByKeyWithExpire(deviceKey, device.getNdid(), 100L); + } + log.info("设备处理完成: {}", device.getNdid()); + } +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java index 98c9f38..5c8992d 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/ICsEquipmentDeliveryService.java @@ -65,6 +65,11 @@ public interface ICsEquipmentDeliveryService extends IService getOnlineDev(); + /** + * 获取启用、系统在线、MQTT接入的装置 + */ + List getUseOnlineDevice(); + /** * 获取离线、启用、客户端在线的装置 */ diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java index 6113b1d..e783612 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsEquipmentDeliveryServiceImpl.java @@ -17,10 +17,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; /** @@ -126,6 +123,15 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl getUseOnlineDevice() { + return this.lambdaQuery() + .eq(CsEquipmentDeliveryPO::getRunStatus,AccessEnum.ONLINE.getCode()) + .eq(CsEquipmentDeliveryPO::getDevAccessMethod,"MQTT") + .eq(CsEquipmentDeliveryPO::getUsageStatus,1) + .list(); + } + @Override public List getOfflineDev() { List result = new ArrayList<>(); diff --git a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java index 143cb1a..ca0ffe9 100644 --- a/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java +++ b/iot-analysis/analysis-stat/stat-boot/src/main/java/com/njcn/stat/service/impl/StatServiceImpl.java @@ -152,7 +152,7 @@ public class StatServiceImpl implements IStatService { } if (CollectionUtil.isNotEmpty(recordList)){ //influx数据批量入库 - influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, recordList); + influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.SECONDS, recordList); //记录监测点最新数据时间 CsLineLatestData csLineLatestData = new CsLineLatestData(); csLineLatestData.setLineId(lineId); @@ -192,23 +192,40 @@ public class StatServiceImpl implements IStatService { */ public List assembleData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) { List records = new ArrayList(); - //解码 List floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData())); if (CollectionUtil.isEmpty(floats)){ throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL); } - //校验模板和解码数据数量能否对应上 if (!Objects.equals(dataArrayList.size(),floats.size())){ throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH); } Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); + + boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD"); + //fixme 捂脸设备上送的是北京时间,时序数据库录入时 需要utc时间,减去8小时 + long originalTimeSec = flag ? item.getDataTimeSec() : item.getDataTimeSec() - 8 * 3600; + if (originalTimeSec < 0) { + System.out.println("originalTimeSec==:" + originalTimeSec); + } + for (int i = 0; i < dataArrayList.size(); i++) { String tableName = map.get(dataArrayList.get(i).getName()); + long adjustedTimeSec; + + //短时闪变 || 电压波动 10分钟 + if (Objects.equals(tableName,"data_flicker") || Objects.equals(tableName,"data_fluc")) { + adjustedTimeSec = (originalTimeSec / 600) * 600; + } + //长时闪变 2小时 + else if (Objects.equals(tableName,"data_plt")) { + adjustedTimeSec = (originalTimeSec / 7200) * 7200; + } + else { + adjustedTimeSec = originalTimeSec; + } Map tags = new HashMap<>(); tags.put(InfluxDBTableConstant.LINE_ID,lineId); tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase()); - //todo 不清楚之前为啥要修改相别,这边按字典配置相别无法查询到数据,先改回来 - //tags.put(InfluxDBTableConstant.PHASIC_TYPE,Objects.isNull(PHASE_MAPPING.get(dataArrayList.get(i).getPhase()))?dataArrayList.get(i).getPhase():PHASE_MAPPING.get(dataArrayList.get(i).getPhase())); tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod); tags.put(InfluxDBTableConstant.CL_DID,clDid.toString()); tags.put(InfluxDBTableConstant.PROCESS,process.toString()); @@ -217,9 +234,8 @@ public class StatServiceImpl implements IStatService { //这边特殊处理,如果数据为3.14159,则将数据置为null fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i)); fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag()); - //fixme 设备上送的是北京时间,时序数据库录入时 需要utc时间,减去8小时 - boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD"); - Point point = influxDbUtils.pointBuilder(tableName, flag?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); + + Point point = influxDbUtils.pointBuilder(tableName, adjustedTimeSec, TimeUnit.SECONDS, tags, fields); BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); batchPoints.point(point); records.add(batchPoints.lineProtocol()); @@ -227,6 +243,43 @@ public class StatServiceImpl implements IStatService { return records; } +// public List assembleData(String lineId,List dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) { +// List records = new ArrayList(); +// //解码 +// List floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData())); +// if (CollectionUtil.isEmpty(floats)){ +// throw new BusinessException(StatResponseEnum.AUTO_DATA_NULL); +// } +// //校验模板和解码数据数量能否对应上 +// if (!Objects.equals(dataArrayList.size(),floats.size())){ +// throw new BusinessException(StatResponseEnum.ARRAY_DATA_NOT_MATCH); +// } +// Map map = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.ELE_EPD_PQD)), Map.class); +// for (int i = 0; i < dataArrayList.size(); i++) { +// String tableName = map.get(dataArrayList.get(i).getName()); +// Map tags = new HashMap<>(); +// tags.put(InfluxDBTableConstant.LINE_ID,lineId); +// tags.put(InfluxDBTableConstant.PHASIC_TYPE,dataArrayList.get(i).getPhase()); +// //todo 不清楚之前为啥要修改相别,这边按字典配置相别无法查询到数据,先改回来 +// //tags.put(InfluxDBTableConstant.PHASIC_TYPE,Objects.isNull(PHASE_MAPPING.get(dataArrayList.get(i).getPhase()))?dataArrayList.get(i).getPhase():PHASE_MAPPING.get(dataArrayList.get(i).getPhase())); +// tags.put(InfluxDBTableConstant.VALUE_TYPE,statMethod); +// tags.put(InfluxDBTableConstant.CL_DID,clDid.toString()); +// tags.put(InfluxDBTableConstant.PROCESS,process.toString()); +// tags.put(InfluxDBTableConstant.QUALITY_FLAG,"0"); +// Map fields = new HashMap<>(); +// //这边特殊处理,如果数据为3.14159,则将数据置为null +// fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i)); +// fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag()); +// //fixme 设备上送的是北京时间,时序数据库录入时 需要utc时间,减去8小时 +// boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD"); +// Point point = influxDbUtils.pointBuilder(tableName, flag?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields); +// BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build(); +// batchPoints.point(point); +// records.add(batchPoints.lineProtocol()); +// } +// return records; +// } + public List objectToList(Object object) { List urlList = new ArrayList<>(); if (object != null) {