mqtt依赖包引入

This commit is contained in:
xy
2026-04-21 19:21:35 +08:00
parent fed766bca4
commit d8b292d447
6 changed files with 105 additions and 99 deletions

View File

@@ -1,74 +1,74 @@
package com.njcn.csdevice.utils; //package com.njcn.csdevice.utils;
//
import org.eclipse.paho.client.mqttv3.*; //import org.eclipse.paho.client.mqttv3.*;
//
import java.io.File; //import java.io.File;
import java.io.FileInputStream; //import java.io.FileInputStream;
import java.io.FileOutputStream; //import java.io.FileOutputStream;
import java.io.IOException; //import java.io.IOException;
import java.nio.file.Files; //import java.nio.file.Files;
import java.nio.file.Paths; //import java.nio.file.Paths;
//
/** ///**
* Description: // * Description:
* Date: 2023/8/2 13:41【需求编号】 // * Date: 2023/8/2 13:41【需求编号】
* // *
* @author clam // * @author clam
* @version V1.0.0 // * @version V1.0.0
*/ // */
public class MqttTest { //public class MqttTest {
private static final String MQTT_BROKER = "tcp://192.168.1.13:1883"; // private static final String MQTT_BROKER = "tcp://192.168.1.13:1883";
private static final String MQTT_TOPIC = "file/upload"; // private static final String MQTT_TOPIC = "file/upload";
private static final String FILE_PATH = "C:\\Users\\无名\\Desktop\\111.json"; // Replace with the path to your file // private static final String FILE_PATH = "C:\\Users\\无名\\Desktop\\111.json"; // Replace with the path to your file
//
public static void main(String[] args) { // public static void main(String[] args) {
MqttClient mqttClient = null; // MqttClient mqttClient = null;
try { // try {
// Connect to the MQTT broker // // Connect to the MQTT broker
mqttClient = new MqttClient(MQTT_BROKER, MqttClient.generateClientId()); // mqttClient = new MqttClient(MQTT_BROKER, MqttClient.generateClientId());
MqttConnectOptions connOpts = new MqttConnectOptions(); // MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("t_user"); // connOpts.setUserName("t_user");
connOpts.setPassword("njcnpqs".toCharArray()); // connOpts.setPassword("njcnpqs".toCharArray());
//
mqttClient.connect(connOpts); // mqttClient.connect(connOpts);
//
// Read the file // // Read the file
File file = new File(FILE_PATH); // File file = new File(FILE_PATH);
FileInputStream fis = new FileInputStream(file); // FileInputStream fis = new FileInputStream(file);
byte[] fileContent = new byte[(int) file.length()]; // byte[] fileContent = new byte[(int) file.length()];
fis.read(fileContent); // fis.read(fileContent);
fis.close(); // fis.close();
//
// Create a new MQTT message // // Create a new MQTT message
MqttMessage message = new MqttMessage(fileContent); // MqttMessage message = new MqttMessage(fileContent);
//
// Set QoS level and retain flag as per your requirement // // Set QoS level and retain flag as per your requirement
message.setQos(1); // message.setQos(1);
// message.setRetained(false); //// message.setRetained(false);
//
// Record the start time // // Record the start time
long startTime = System.currentTimeMillis(); // long startTime = System.currentTimeMillis();
//
// Publish the message to the MQTT topic // // Publish the message to the MQTT topic
mqttClient.publish(MQTT_TOPIC, message); // mqttClient.publish(MQTT_TOPIC, message);
//
// Record the end time // // Record the end time
long endTime = System.currentTimeMillis(); // long endTime = System.currentTimeMillis();
//
System.out.println("File published successfully!"); // System.out.println("File published successfully!");
System.out.println("Time taken: " + (endTime - startTime) + " ms"); // System.out.println("Time taken: " + (endTime - startTime) + " ms");
} catch (MqttException | IOException e) { // } catch (MqttException | IOException e) {
e.printStackTrace(); // e.printStackTrace();
} finally { // } finally {
// Disconnect from the MQTT broker // // Disconnect from the MQTT broker
if (mqttClient != null && mqttClient.isConnected()) { // if (mqttClient != null && mqttClient.isConnected()) {
try { // try {
mqttClient.disconnect(); // mqttClient.disconnect();
} catch (MqttException e) { // } catch (MqttException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
} // }
} // }
//
} //}

View File

@@ -21,6 +21,10 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.njcn</groupId> <groupId>com.njcn</groupId>
<artifactId>common-web</artifactId> <artifactId>common-web</artifactId>

View File

@@ -20,6 +20,10 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.njcn</groupId> <groupId>com.njcn</groupId>
<artifactId>common-web</artifactId> <artifactId>common-web</artifactId>

View File

@@ -3,18 +3,11 @@ package com.njcn.csharmonic.handler;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.gson.Gson; import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.tocrhz.mqtt.annotation.MqttSubscribe; import com.github.tocrhz.mqtt.annotation.MqttSubscribe;
import com.github.tocrhz.mqtt.annotation.NamedValue; import com.github.tocrhz.mqtt.annotation.NamedValue;
import com.github.tocrhz.mqtt.annotation.Payload; import com.github.tocrhz.mqtt.annotation.Payload;
import com.github.tocrhz.mqtt.publisher.MqttPublisher; import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.access.utils.FileCommonUtils;
import com.njcn.csdevice.api.DevCapacityFeignClient; import com.njcn.csdevice.api.DevCapacityFeignClient;
import com.njcn.csdevice.api.DeviceFtpFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
import com.njcn.csharmonic.param.CommonStatisticalQueryParam; import com.njcn.csharmonic.param.CommonStatisticalQueryParam;
import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam; import com.njcn.csharmonic.param.FrequencyStatisticalQueryParam;
import com.njcn.csharmonic.pojo.dto.RealTimeDataDTO; import com.njcn.csharmonic.pojo.dto.RealTimeDataDTO;
@@ -26,8 +19,6 @@ import com.njcn.csharmonic.service.ILineTargetService;
import com.njcn.csharmonic.service.StableDataService; import com.njcn.csharmonic.service.StableDataService;
import com.njcn.csharmonic.service.TemperatureService; import com.njcn.csharmonic.service.TemperatureService;
import com.njcn.influx.pojo.dto.StatisticalDataDTO; import com.njcn.influx.pojo.dto.StatisticalDataDTO;
import com.njcn.oss.constant.OssPath;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil; import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.CsStatisticalSetFeignClient; import com.njcn.system.api.CsStatisticalSetFeignClient;
import com.njcn.system.pojo.po.EleEpdPqd; import com.njcn.system.pojo.po.EleEpdPqd;
@@ -56,25 +47,15 @@ import java.util.stream.Stream;
public class MqttMessageHandler { public class MqttMessageHandler {
private final MqttPublisher publisher; private final MqttPublisher publisher;
private final FileCommonUtils fileCommonUtils;
private final ILineTargetService lineTargetService; private final ILineTargetService lineTargetService;
private final CsStatisticalSetFeignClient csStatisticalSetFeignClient; private final CsStatisticalSetFeignClient csStatisticalSetFeignClient;
private final StableDataService stableDataService; private final StableDataService stableDataService;
private final RedisUtil redisUtil; private final RedisUtil redisUtil;
private final TemperatureService temperatureService; private final TemperatureService temperatureService;
private final DevCapacityFeignClient devCapacityFeignClient; private final DevCapacityFeignClient devCapacityFeignClient;
private final DecimalFormat df = new DecimalFormat("#0.000"); private final DecimalFormat df = new DecimalFormat("#0.000");
private final ChannelObjectUtil channelObjectUtil;
private final EquipmentFeignClient equipmentFeignClient;
private final CsTopicFeignClient csTopicFeignClient;
private final DeviceFtpFeignClient deviceFtpFeignClient;
private static Integer mid = 1;
private final FileFeignClient fileFeignClient; private final FileFeignClient fileFeignClient;
private final CsEventPOService csEventPOService;
CsEventPOService csEventPOService;
/** /**
* 实时数据应答 * 实时数据应答

View File

@@ -72,6 +72,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.influxdb.InfluxDB; import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints; import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point; import org.influxdb.dto.Point;
@@ -706,11 +707,23 @@ public class CsEventPOServiceImpl extends ServiceImpl<CsEventPOMapper, CsEventPO
WaveDataDTO waveDataDTO = this.analyseWave(eventId, iType); WaveDataDTO waveDataDTO = this.analyseWave(eventId, iType);
//数据筛选,如果是双路电压的话会存在2个波形数据 //数据筛选,如果是双路电压的话会存在2个波形数据
List<WaveDataDetail> waveDataDetails = WaveUtil.filterWaveData(waveDataDTO); List<WaveDataDetail> waveDataDetails = WaveUtil.filterWaveData(waveDataDTO);
String instantPath = wavePicComponent.generateInstantImageZl(waveDataDetails); //单通道处理
eventDetail.setInstantPics(instantPath); if (ObjectUtils.isNotEmpty(waveDataDetails) && waveDataDetails.size() == 2) {
if (StrUtil.isBlank(eventDetail.getRmsPics())) { String instantPath = wavePicComponent.generateImageShun(waveDataDTO,waveDataDetails);
String rmsPath = wavePicComponent.generateRmsImageZl(waveDataDetails); eventDetail.setInstantPics(instantPath);
eventDetail.setRmsPics(rmsPath); if (StrUtil.isBlank(eventDetail.getRmsPics())) {
String rmsPath = wavePicComponent.generateImageRms(waveDataDTO,waveDataDetails);
eventDetail.setRmsPics(rmsPath);
}
}
//双通道处理
else if (ObjectUtils.isNotEmpty(waveDataDetails) && waveDataDetails.size() == 4) {
String instantPath = wavePicComponent.generateInstantImageZl(waveDataDetails);
eventDetail.setInstantPics(instantPath);
if (StrUtil.isBlank(eventDetail.getRmsPics())) {
String rmsPath = wavePicComponent.generateRmsImageZl(waveDataDetails);
eventDetail.setRmsPics(rmsPath);
}
} }
this.updateById(eventDetail); this.updateById(eventDetail);
} }

View File

@@ -21,6 +21,10 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.njcn</groupId> <groupId>com.njcn</groupId>
<artifactId>cs-system-api</artifactId> <artifactId>cs-system-api</artifactId>