diff --git a/iot-access/access-api/src/main/java/com/njcn/access/config/MqttInfo.java b/iot-access/access-api/src/main/java/com/njcn/access/config/MqttInfo.java new file mode 100644 index 0000000..aafd007 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/config/MqttInfo.java @@ -0,0 +1,29 @@ +package com.njcn.access.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/7/14 15:16 + */ +@Data +@Configuration +@Order(10) +public class MqttInfo { + + @Value("${mqtt.clientUrl}") + private String url; + + @Value("${mqtt.secretKeyUserName}") + private String userName; + + @Value("${mqtt.secretKeyPassword}") + private String password; + +} diff --git a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java index 266da3a..a741138 100644 --- a/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java +++ b/iot-access/access-api/src/main/java/com/njcn/access/enums/AccessResponseEnum.java @@ -16,6 +16,7 @@ public enum AccessResponseEnum { */ NDID_NO_FIND("A0301", "nDid在平台端未找到或者已注册"), + MISSING_CLIENT("A0302","设备客户端不在线!"), MODEL_REPEAT("A0302", "模板重复,请勿重复录入!"), MODEL_NO_FIND("A0302", "模板不存在,请先录入模板数据!"), diff --git a/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/mqtt/MqttClientDto.java b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/mqtt/MqttClientDto.java new file mode 100644 index 0000000..92e8425 --- /dev/null +++ b/iot-access/access-api/src/main/java/com/njcn/access/pojo/dto/mqtt/MqttClientDto.java @@ -0,0 +1,26 @@ +package com.njcn.access.pojo.dto.mqtt; + +import com.alibaba.nacos.shaded.com.google.gson.annotations.SerializedName; +import lombok.Data; + +import java.io.Serializable; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/7/17 9:09 + */ +@Data +public class MqttClientDto implements Serializable { + + @SerializedName("clientid") + private String clientId; + + @SerializedName("username") + private String userName; + + private boolean connected; + +} diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java index 03f89fd..66143f1 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/handler/MqttMessageHandler.java @@ -21,10 +21,8 @@ import com.njcn.access.service.ICsTopicService; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; import com.njcn.csdevice.api.DevModelFeignClient; -import com.njcn.csdevice.api.EquipmentFeignClient; import com.njcn.csdevice.pojo.po.CsDevModelPO; import com.njcn.redis.utils.RedisUtil; -import com.njcn.system.api.DicDataFeignClient; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -108,8 +106,6 @@ public class MqttMessageHandler { ReqAndResDto.Res res = gson.fromJson(new String(message.getPayload(), StandardCharsets.UTF_8), ReqAndResDto.Res.class); if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){ if (Objects.equals(res.getType(),TypeEnum.TYPE_17.getCode())){ - //这边用redis缓存来判断是否接收响应 - redisUtil.saveByKeyWithExpire(nDid,AccessEnum.SUCCESS.getCode(),600L); //询问模板数据 ReqAndResDto.Req reqAndResParam = new ReqAndResDto.Req(); reqAndResParam.setMid(1); @@ -164,7 +160,6 @@ public class MqttMessageHandler { //todo 这边也是要调整的 String key2 = "MODEL" + nDid; redisUtil.saveByKeyWithExpire(key2,modelId,600L); - redisUtil.delete(nDid); } diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java index c1c2dd8..fe7aac1 100644 --- a/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java +++ b/iot-access/access-boot/src/main/java/com/njcn/access/service/impl/CsDeviceServiceImpl.java @@ -14,6 +14,7 @@ import com.njcn.access.pojo.dto.ReqAndResDto; import com.njcn.access.pojo.po.CsGateway; import com.njcn.access.service.ICsDeviceService; import com.njcn.access.service.ICsGatewayService; +import com.njcn.access.utils.MqttUtil; import com.njcn.common.pojo.enums.response.CommonResponseEnum; import com.njcn.common.pojo.exception.BusinessException; import com.njcn.common.utils.PubUtils; @@ -23,6 +24,7 @@ import com.njcn.csdevice.pojo.param.CsLedgerParam; import com.njcn.csdevice.pojo.po.AppLineTopologyDiagramPO; import com.njcn.csdevice.pojo.po.CsDeviceUserPO; import com.njcn.csdevice.pojo.po.CsLinePO; +import com.njcn.csdevice.pojo.po.MqttUser; import com.njcn.csdevice.pojo.vo.CsEquipmentDeliveryVO; import com.njcn.redis.utils.RedisUtil; import com.njcn.system.api.DicDataFeignClient; @@ -77,6 +79,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService { private final RedisUtil redisUtil; + private final MqttUtil mqttUtil; + @Override @Transactional(rollbackFor = {Exception.class}) public void devRegister(String nDid) { @@ -94,6 +98,12 @@ public class CsDeviceServiceImpl implements ICsDeviceService { if (!Objects.equals(code, DicDataEnum.CONNECT_DEV.getCode())){ throw new BusinessException(AccessResponseEnum.DEV_IS_NOT_ZL); } + //3.判断客户端是否在线 + //mqttUtil.judgeClientOnline(nDid); + boolean mqttClient = mqttUtil.judgeClientOnline("access-boot123456"); + if (!mqttClient){ + throw new BusinessException(AccessResponseEnum.MISSING_CLIENT); + } //3.MQTT询问装置用的模板,并判断库中是否存在模板 //存在则建立关系;不存在则告警出来 SysDicTreePO dictData = dictTreeFeignClient.queryById(csEquipmentDeliveryVO.getDevModel()).getData(); diff --git a/iot-access/access-boot/src/main/java/com/njcn/access/utils/MqttUtil.java b/iot-access/access-boot/src/main/java/com/njcn/access/utils/MqttUtil.java new file mode 100644 index 0000000..b172ad9 --- /dev/null +++ b/iot-access/access-boot/src/main/java/com/njcn/access/utils/MqttUtil.java @@ -0,0 +1,53 @@ +package com.njcn.access.utils; + +import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.njcn.access.config.MqttInfo; +import com.njcn.access.pojo.dto.mqtt.MqttClientDto; +import lombok.AllArgsConstructor; +import lombok.Data; +import okhttp3.Credentials; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +import java.io.IOException; +import java.util.Objects; + +/** + * 类的介绍: + * + * @author xuyang + * @version 1.0.0 + * @createTime 2023/7/14 14:44 + */ +@Data +@Order(100) +@Configuration +@AllArgsConstructor +public class MqttUtil { + + private final MqttInfo mqttInfo; + + public boolean judgeClientOnline(String id) { + boolean result = false; + try { + Gson gson = new Gson(); + OkHttpClient client = new OkHttpClient(); + Request request = new Request.Builder() + .url(mqttInfo.getUrl() + "/api/v5/clients/" + id) + .header("Content-Type", "application/json") + .header("Authorization", Credentials.basic(mqttInfo.getUserName(), mqttInfo.getPassword())) + .build(); + Response response = client.newCall(request).execute(); + response.body(); + MqttClientDto mqttClientDto = gson.fromJson(Objects.requireNonNull(response.body()).string(), new TypeToken(){}.getType()); + result = mqttClientDto.isConnected(); + } catch (IOException e) { + e.printStackTrace(); + } + return result; + } +}