package com.njcn; import cn.hutool.core.collection.CollUtil; import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.njcn.access.AccessBootApplication; import com.njcn.access.enums.AccessEnum; import com.njcn.access.service.ICsEquipmentDeliveryService; import com.njcn.access.service.ICsTopicService; import com.njcn.access.service.impl.CsDeviceServiceImpl; import com.njcn.access.utils.MqttUtil; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO; import com.njcn.oss.utils.FileStorageUtil; import com.njcn.redis.pojo.enums.AppRedisKey; import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.DictTreeFeignClient; import com.njcn.system.enums.DicDataEnum; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.web.WebAppConfiguration; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.*; import static org.junit.Assert.assertTrue; /** * Unit test for simple App. */ @RunWith(SpringRunner.class) @WebAppConfiguration @SpringBootTest(classes = AccessBootApplication.class) @Slf4j public class AppTest { /** * Rigorous Test :-) */ @Test public void shouldAnswerWithTrue() { assertTrue( true ); } @Resource private MqttPublisher publisher; @Resource private FileStorageUtil fileStorageUtil; @Resource private ICsEquipmentDeliveryService csEquipmentDeliveryService; @Resource private CsDeviceServiceImpl csDeviceService; private RedisUtil redisUtil; @Resource private MqttUtil mqttUtil; @Test public void deleteRedis() { redisUtil.deleteKeysByString("devModelKey:00B78DA800B011avg"); } ScheduledFuture runnableFuture = null; @Resource private ICsTopicService csTopicService; ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); ExecutorService executor = Executors.newFixedThreadPool(10); private static final long ACCESS_TIME = 20L; @Resource private DictTreeFeignClient dictTreeFeignClient; /** * 测试下载文件 */ @Test public void run1() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); Runnable task = () -> { System.out.println("轮询定时任务执行中!"); }; scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS); } @Test public void run() { Runnable task = () -> { log.info("轮询定时任务执行中!"); CsEquipmentDeliveryPO po = new CsEquipmentDeliveryPO(); po.setNdid("00B78DA80103"); po.setDevType("8b45cf6b7f5266e777d07c166ad5fa77"); po.setStatus(2); List list = Collections.singletonList(po); if (CollUtil.isNotEmpty(list)) { ExecutorService executor = Executors.newFixedThreadPool(10); // 将任务平均分配给10个子列表 List> subLists = new ArrayList<>(); int partitionSize = list.size() / 10; for (int i = 0; i < 10; i++) { int start = i * partitionSize; int end = (i == 9) ? list.size() : start + partitionSize; subLists.add(list.subList(start, end)); } // 创建一个ExecutorService来处理这些任务 List> futures = new ArrayList<>(); // 提交任务给线程池执行 for (int i = 0; i < 10; i++) { int index = i; futures.add(executor.submit(new Callable() { @Override public Void call() { // accessDev(subLists.get(index)); System.out.println("123"); return null; } })); } // 等待所有任务完成 for (Future future : futures) { try { future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } // 关闭ExecutorService executor.shutdown(); } }; //第一次执行的时间为120s,然后每隔120s执行一次 scheduler.scheduleAtFixedRate(task,0,20,TimeUnit.SECONDS); } public void accessDev(List list) { if (CollUtil.isNotEmpty(list)) { list.forEach(item->{ System.out.println(Thread.currentThread().getName() + ": auto : nDid : " + item.getNdid()); //判断设备类型 便携式设备需要特殊处理 未注册的要先注册、再接入;已注册的直接重新接入 String code = dictTreeFeignClient.queryById(item.getDevType()).getData().getCode(); if (Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && Objects.equals(item.getStatus(),1)) { //csDeviceService.wlDevRegister(item.getNdid()); log.info("请先手动注册、接入"); } else { String version = csTopicService.getVersion(item.getNdid()); if (Objects.isNull(version)) { version = "V1"; } csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); } redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); }); } } // @Test // public void lossTest() { // final int[] mid = {2}; // for (int i = 0; i < 2; i++) { // mid[0] = mid[0] + 1; // } // System.out.println("mid==:" + mid[0]); // } // // @Test // public void test1() { // String clientName = "NJCN-016AB3"; // boolean mqttClient = mqttUtil.judgeClientOnline(clientName); // System.out.println("mqttClient==:" + mqttClient); // } // // @Test // public void testAutoAccess() { // List list = new ArrayList<>(); // //项目启动60s后发起自动接入 // Runnable task = () -> { // long time1 = System.currentTimeMillis(); // List list1 = csEquipmentDeliveryService.getAll(); // for (int i = 0; i < 100; i++) { // list.addAll(list1); // } // if (CollUtil.isNotEmpty(list)) { // // 将任务平均分配给10个子列表 // List> subLists = new ArrayList<>(); // int partitionSize = list.size() / 10; // for (int i = 0; i < 10; i++) { // int start = i * partitionSize; // int end = (i == 9) ? list.size() : start + partitionSize; // subLists.add(list.subList(start, end)); // } // // // 创建一个ExecutorService来处理这些任务 // List> futures = new ArrayList<>(); // // 提交任务给线程池执行 // for (int i = 0; i < 10; i++) { // int index = i; // futures.add(executor.submit(new Callable() { // @Override // public Void call() throws Exception { // accessDev(subLists.get(index)); // return null; // } // })); // } // // 等待所有任务完成 // for (Future future : futures) { // try { // future.get(); // } catch (InterruptedException | ExecutionException e) { // throw new RuntimeException(e); // } // } // // 关闭ExecutorService // executor.shutdown(); // scheduler.shutdown(); // } // long time2 = System.currentTimeMillis(); // System.out.println("执行时间==:" + (time2 - time1)); // }; // scheduler.schedule(task, ACCESS_TIME, TimeUnit.SECONDS); // } // public void accessDev(List list) { // list.forEach(item->{ // try { // System.out.println(Thread.currentThread().getName() + ": processing data " + item.getNdid()); // Thread.sleep(2000); // String version = csTopicService.getVersion(item.getNdid()); // if (!Objects.isNull(version)){ // csDeviceService.devAccessAskTemplate(item.getNdid(),version,1); // redisUtil.saveByKey(AppRedisKey.DEVICE_MID + item.getNdid(),1); // } // } catch (InterruptedException e) { // throw new RuntimeException(e); // } // }); // } // @Test // @After // public void test() { // String nDid = "00B78D016AB5"; // String version = "V1"; // try { //// //装置没有心跳,则立马发起接入请求 //// csDeviceService.devAccessAskTemplate(nDid,version,1); //// log.info("装置掉线3分钟发送接入请求"); //// Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); //// if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ //// throw new BusinessException(CommonResponseEnum.SUCCESS); //// } //// //心跳断连立马发起接入失败后,1分钟再次发起请求,请求3次 //// for (int i = 2; i < 5; i++) { //// //接入再次失败,则定时发起接入请求 //// Thread.sleep(1000 * 6); //// csDeviceService.devAccessAskTemplate(nDid,version,i); //// status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); //// if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ //// break; //// } //// log.info("装置定时1分钟发送接入请求,第" + i + "次尝试"); //// } // Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); // if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){ // final int[] mid = {5}; // runnableFuture = executor.scheduleAtFixedRate(() -> { // csDeviceService.devAccessAskTemplate(nDid,version, mid[0]); // Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); // if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){ // runnableFuture.cancel(false); // } else { // mid[0] = mid[0] + 1; // } // //记录日志 // log.info("装置掉线,定时10分钟发送接入请求,装置为:" + nDid + ",请求的时间戳为:" + System.currentTimeMillis()); // }, 1, 1, TimeUnit.SECONDS); // } // } catch (Exception e) { // e.printStackTrace(); // } // } // @Test // @After // public void test2() { // String nDid = "00B78D016AB5"; // String version = "V1"; // try { // //装置没有心跳,则立马发起接入请求 // csDeviceService.devAccessAskTemplate(nDid,version,1); // log.info("装置掉线3分钟发送接入请求"); // Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); // if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ // throw new BusinessException(CommonResponseEnum.SUCCESS); // } // //心跳断连立马发起接入失败后,1分钟再次发起请求,请求3次 // for (int i = 2; i < 5; i++) { // //接入再次失败,则定时发起接入请求 // Thread.sleep(1000 * 6); // csDeviceService.devAccessAskTemplate(nDid,version,i); // status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); // if (Objects.equals(status,AccessEnum.ONLINE.getCode())){ // break; // } // log.info("装置定时1分钟发送接入请求,第" + i + "次尝试"); // } // if (!Objects.isNull(status) && Objects.equals(status,AccessEnum.OFFLINE.getCode())){ // final int[] mid = {5}; // ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); // runnableFuture = executor.scheduleAtFixedRate(() -> { // csDeviceService.devAccessAskTemplate(nDid,version, mid[0]); // Integer status2 = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); // if (Objects.equals(status2,AccessEnum.ONLINE.getCode())){ // runnableFuture.cancel(true); // } else { // mid[0] = mid[0] + 1; // } // //记录日志 // log.info("装置掉线,定时10分钟发送接入请求,装置为:" + nDid + ",请求的时间戳为:" + System.currentTimeMillis()); // }, 1, 1, TimeUnit.SECONDS); // } // } catch (Exception e) { // e.printStackTrace(); // } // } // // @Test // @After // public void testDeviceAccess() { // String nDid = "00B78D016AB5"; // String version = "V1"; // try { // // 初次接入请求 // initiateDeviceAccess(nDid, version, 1); // // 检查设备状态 // Integer status = checkDeviceStatus(nDid); // if (status != null && Objects.equals(status, AccessEnum.ONLINE.getCode())) { // throw new BusinessException(CommonResponseEnum.SUCCESS); // } // // 重试接入请求,最多尝试3次 // attemptReconnect(nDid, version); // // 如果设备仍然离线,开始定时任务发起接入请求 // if (status != null && Objects.equals(status, AccessEnum.OFFLINE.getCode())) { // startScheduledReconnection(nDid, version); // } // } catch (Exception e) { // log.error("Device access error", e); // } // } // // private static final String BROKER_URL = "tcp://192.168.1.27:1885"; // private static final String CLIENT_ID = "JavaAsyncPublisher"; // private static final int QOS = 1; // Quality of Service // private static final int NUM_DEVICES = 10; // private static final String TOPIC_PREFIX = "/Dev/Data/V1/"; // private static final int DEV_NUMS = 20; // // @Test // public void test11() { // MqttClient client = null; // ExecutorService executor = Executors.newFixedThreadPool(NUM_DEVICES); // // try { // client = new MqttClient(BROKER_URL, CLIENT_ID); // MqttConnectOptions options = new MqttConnectOptions(); // options.setCleanSession(true); // client.connect(options); // // client.setCallback(new MqttCallback() { // @Override // public void connectionLost(Throwable cause) { // // Handle connection loss // } // // @Override // public void messageArrived(String topic, MqttMessage message) throws Exception { // // Handle incoming messages (not used in this example) // } // // @Override // public void deliveryComplete(IMqttDeliveryToken token) { // // Handle delivery completion // System.out.println("Message delivery completed for token: " + token.isComplete()); // } // }); // // // Submit tasks to the executor service to send messages to each device // for (int i = 1; i <= DEV_NUMS; i++) { // final String deviceId = "00B78DA8000" + i; // MqttClient finalClient = client; // executor.submit(() -> { // try { // String topic = TOPIC_PREFIX + deviceId; // String payload = "Message for device " + deviceId; // MqttMessage message = new MqttMessage(payload.getBytes()); // message.setQos(QOS); // finalClient.publish(topic, message); // System.out.println("Sent message to topic: " + topic + " Message: " + payload); // } catch (MqttException e) { // e.printStackTrace(); // } // }); // } // } catch (MqttException e) { // e.printStackTrace(); // } finally { // if (client != null && client.isConnected()) { // try { // client.disconnect(); // } catch (MqttException e) { // e.printStackTrace(); // } // } // } // } // // private void initiateDeviceAccess(String nDid, String version, int attempt) { // csDeviceService.devAccessAskTemplate(nDid, version, attempt); // log.info("装置掉线3分钟发送接入请求"); // } // // private Integer checkDeviceStatus(String nDid) { // return csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus(); // } // // private void attemptReconnect(String nDid, String version) throws InterruptedException { // for (int i = 2; i < 5; i++) { // Thread.sleep(1000 * 6); // 每 6 秒重试一次 // initiateDeviceAccess(nDid, version, i); // // Integer status = checkDeviceStatus(nDid); // if (status != null && Objects.equals(status, AccessEnum.ONLINE.getCode())) { // break; // } // log.info("装置定时1分钟发送接入请求,第" + i + "次尝试"); // } // } // // private void startScheduledReconnection(String nDid, String version) { // final int[] attemptCounter = {5}; // ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); // // Runnable reconnectTask = () -> { // initiateDeviceAccess(nDid, version, attemptCounter[0]); // // Integer status = checkDeviceStatus(nDid); // if (status != null && Objects.equals(status, AccessEnum.ONLINE.getCode())) { // executor.shutdown(); // 关闭调度器 // } else { // attemptCounter[0]++; // } // log.info("装置掉线,定时10分钟发送接入请求,装置为:" + nDid // + ",请求的时间戳为:" + System.currentTimeMillis()); // }; // // executor.scheduleAtFixedRate(reconnectTask, 1, 1, TimeUnit.SECONDS); // } }