判断mqtt客户端是否在线
This commit is contained in:
@@ -0,0 +1,29 @@
|
||||
package com.njcn.access.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.annotation.Order;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2023/7/14 15:16
|
||||
*/
|
||||
@Data
|
||||
@Configuration
|
||||
@Order(10)
|
||||
public class MqttInfo {
|
||||
|
||||
@Value("${mqtt.clientUrl}")
|
||||
private String url;
|
||||
|
||||
@Value("${mqtt.secretKeyUserName}")
|
||||
private String userName;
|
||||
|
||||
@Value("${mqtt.secretKeyPassword}")
|
||||
private String password;
|
||||
|
||||
}
|
||||
@@ -16,6 +16,7 @@ public enum AccessResponseEnum {
|
||||
*/
|
||||
NDID_NO_FIND("A0301", "nDid在平台端未找到或者已注册"),
|
||||
|
||||
MISSING_CLIENT("A0302","设备客户端不在线!"),
|
||||
MODEL_REPEAT("A0302", "模板重复,请勿重复录入!"),
|
||||
MODEL_NO_FIND("A0302", "模板不存在,请先录入模板数据!"),
|
||||
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.njcn.access.pojo.dto.mqtt;
|
||||
|
||||
import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2023/7/17 9:09
|
||||
*/
|
||||
@Data
|
||||
public class MqttClientDto implements Serializable {
|
||||
|
||||
@SerializedName("clientid")
|
||||
private String clientId;
|
||||
|
||||
@SerializedName("username")
|
||||
private String userName;
|
||||
|
||||
private boolean connected;
|
||||
|
||||
}
|
||||
@@ -21,10 +21,8 @@ import com.njcn.access.service.ICsTopicService;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
import com.njcn.csdevice.api.DevModelFeignClient;
|
||||
import com.njcn.csdevice.api.EquipmentFeignClient;
|
||||
import com.njcn.csdevice.pojo.po.CsDevModelPO;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.DicDataFeignClient;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
@@ -108,8 +106,6 @@ public class MqttMessageHandler {
|
||||
ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class);
|
||||
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
|
||||
if (Objects.equals(res.getType(),TypeEnum.TYPE_17.getCode())){
|
||||
//这边用redis缓存来判断是否接收响应
|
||||
redisUtil.saveByKeyWithExpire(nDid,AccessEnum.SUCCESS.getCode(),600L);
|
||||
//询问模板数据
|
||||
ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req();
|
||||
reqAndResParam.setMid(1);
|
||||
@@ -164,7 +160,6 @@ public class MqttMessageHandler {
|
||||
//todo 这边也是要调整的
|
||||
String key2 = "MODEL" + nDid;
|
||||
redisUtil.saveByKeyWithExpire(key2,modelId,600L);
|
||||
redisUtil.delete(nDid);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.njcn.access.pojo.dto.ReqAndResDto;
|
||||
import com.njcn.access.pojo.po.CsGateway;
|
||||
import com.njcn.access.service.ICsDeviceService;
|
||||
import com.njcn.access.service.ICsGatewayService;
|
||||
import com.njcn.access.utils.MqttUtil;
|
||||
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
|
||||
import com.njcn.common.pojo.exception.BusinessException;
|
||||
import com.njcn.common.utils.PubUtils;
|
||||
@@ -23,6 +24,7 @@ import com.njcn.csdevice.pojo.param.CsLedgerParam;
|
||||
import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO;
|
||||
import com.njcn.csdevice.pojo.po.CsDeviceUserPO;
|
||||
import com.njcn.csdevice.pojo.po.CsLinePO;
|
||||
import com.njcn.csdevice.pojo.po.MqttUser;
|
||||
import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO;
|
||||
import com.njcn.redis.utils.RedisUtil;
|
||||
import com.njcn.system.api.DicDataFeignClient;
|
||||
@@ -77,6 +79,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
|
||||
private final RedisUtil redisUtil;
|
||||
|
||||
private final MqttUtil mqttUtil;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = {Exception.class})
|
||||
public void devRegister(String nDid) {
|
||||
@@ -94,6 +98,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
|
||||
if (!Objects.equals(code, DicDataEnum.CONNECT_DEV.getCode())){
|
||||
throw new BusinessException(AccessResponseEnum.DEV_IS_NOT_ZL);
|
||||
}
|
||||
//3.判断客户端是否在线
|
||||
//mqttUtil.judgeClientOnline(nDid);
|
||||
boolean mqttClient = mqttUtil.judgeClientOnline("access-boot123456");
|
||||
if (!mqttClient){
|
||||
throw new BusinessException(AccessResponseEnum.MISSING_CLIENT);
|
||||
}
|
||||
//3.MQTT询问装置用的模板,并判断库中是否存在模板
|
||||
//存在则建立关系;不存在则告警出来
|
||||
SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData();
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.njcn.access.utils;
|
||||
|
||||
import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken;
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
||||
import com.njcn.access.config.MqttInfo;
|
||||
import com.njcn.access.pojo.dto.mqtt.MqttClientDto;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import okhttp3.Credentials;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.annotation.Order;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 类的介绍:
|
||||
*
|
||||
* @author xuyang
|
||||
* @version 1.0.0
|
||||
* @createTime 2023/7/14 14:44
|
||||
*/
|
||||
@Data
|
||||
@Order(100)
|
||||
@Configuration
|
||||
@AllArgsConstructor
|
||||
public class MqttUtil {
|
||||
|
||||
private final MqttInfo mqttInfo;
|
||||
|
||||
public boolean judgeClientOnline(String id) {
|
||||
boolean result = false;
|
||||
try {
|
||||
Gson gson = new Gson();
|
||||
OkHttpClient client = new OkHttpClient();
|
||||
Request request = new Request.Builder()
|
||||
.url(mqttInfo.getUrl() + "/api/v5/clients/" + id)
|
||||
.header("Content-Type", "application/json")
|
||||
.header("Authorization", Credentials.basic(mqttInfo.getUserName(), mqttInfo.getPassword()))
|
||||
.build();
|
||||
Response response = client.newCall(request).execute();
|
||||
response.body();
|
||||
MqttClientDto mqttClientDto = gson.fromJson(Objects.requireNonNull(response.body()).string(), new TypeToken<MqttClientDto>(){}.getType());
|
||||
result = mqttClientDto.isConnected();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user