11 Commits

Author SHA1 Message Date
xy
6dcee1f6c3 refactor(event): 重构事件通知服务并优化异步处理
- 新增 AppNotificationService 处理应用通知推送逻辑
- 新增 SmsNotificationService 处理短信通知异步发送
- 添加 AsyncConfig 配置类定义事件和短信通知线程池
- 在 CsAlarmServiceImpl 中替换 sendEventUtils 为 appNotificationService
- 在 EventServiceImpl 中替换 sendEventUtils 为 appNotificationService 和 smsNotificationService
- 移除废弃的 SendEventUtils 工具类
- 优化暂降事件短信发送逻辑并支持幅值和持续时间参数
- 修复时序数据库时间处理注释说明,统一使用UTC时间转换
- 启用 Spring 异步注解支持事件和短信异步处理
2026-04-22 16:05:01 +08:00
xy
02d321d4d4 feat(mqtt): 集成 MQTT Spring Boot Starter
- 添加 tocrhz/mqtt-spring-boot-starter 依赖
- 更新项目依赖配置以支持 MQTT 功能
2026-04-21 19:22:00 +08:00
xy
f5b97d83b3 短信服务集成 2026-04-21 16:10:35 +08:00
xy
8041c5f27e refactor(iot-access): 重构设备离线通知和事件推送逻辑
- 移除过时的定时任务相关代码和依赖
- 集成新的DeviceMessageFeignClient服务接口
- 更新设备离线通知逻辑,增加推送客户端检查
- 修改事件用户获取方式,使用远程服务调用
- 优化推送用户筛选流程,统一使用DeviceMessageParam参数
- 清理冗余的导入包和废弃的方法
- 调整事件推送类型的参数传递方式
2026-04-17 16:20:06 +08:00
xy
48c79b721e 微调 2026-04-16 16:15:23 +08:00
xy
eaacce339e 测试 2026-04-14 19:27:35 +08:00
xy
fafcaf3bf0 Merge remote-tracking branch 'origin/master'
# Conflicts:
#	iot-access/access-boot/src/main/java/com/njcn/access/controller/CsDeviceController.java
2026-04-14 19:11:29 +08:00
xy
c2b48d6830 设备自动连接逻辑优化 2026-04-14 19:11:12 +08:00
xy
6b24e49651 设备自动连接逻辑优化 2026-04-08 10:18:07 +08:00
xy
b3d2727a64 设备接入优化 2026-04-08 09:26:52 +08:00
xy
441b5d04fe 微调 2026-04-07 14:22:13 +08:00
26 changed files with 576 additions and 352 deletions

View File

@@ -20,6 +20,10 @@
</properties>
<dependencies>
<dependency>
<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>common-core</artifactId>

View File

@@ -0,0 +1,20 @@
package com.njcn.access.api;
import com.njcn.access.api.fallback.CsDeviceClientFallbackFactory;
import com.njcn.common.pojo.constant.ServerInfo;
import com.njcn.common.pojo.response.HttpResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
* @author xy
*/
@FeignClient(value = ServerInfo.ACCESS_BOOT, path = "/device", fallbackFactory = CsDeviceClientFallbackFactory.class,contextId = "device")
public interface CsDeviceFeignClient {
@PostMapping("/updateRunStatus")
HttpResult<String> updateRunStatus(@RequestParam("nDid") String nDid, @RequestParam("runStatus") Integer runStatus);
}

View File

@@ -0,0 +1,34 @@
package com.njcn.access.api.fallback;
import com.njcn.access.api.CsDeviceFeignClient;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.pojo.response.HttpResult;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author xy
*/
@Slf4j
@Component
public class CsDeviceClientFallbackFactory implements FallbackFactory<CsDeviceFeignClient> {
@Override
public CsDeviceFeignClient create(Throwable cause) {
//判断抛出异常是否为解码器抛出的业务异常
Enum<?> exceptionEnum = CommonResponseEnum.SERVICE_FALLBACK;
if (cause.getCause() instanceof BusinessException) {
BusinessException businessException = (BusinessException) cause.getCause();
}
Enum<?> finalExceptionEnum = exceptionEnum;
return new CsDeviceFeignClient() {
@Override
public HttpResult<String> updateRunStatus(String nDid, Integer runStatus) {
log.error("{}异常,降级处理,异常为:{}","云设备状态调整异常",cause.toString());
throw new BusinessException(finalExceptionEnum);
}
};
}
}

View File

@@ -30,7 +30,7 @@ public enum AccessResponseEnum {
DEV_MODEL_NOT_FIND("A0303","装置型号未找到!"),
DEV_IS_NOT_ZL("A0303","注册装置不是直连装置!"),
DEV_IS_NOT_WG("A0303","注册装置不是网关!"),
DEV_IS_NOT_PORTABLE("A0303","注册装置不是便携式装置!"),
DEV_IS_NOT_PORTABLE("A0303","注册装置不是便携式装置或者在线监测设备!"),
REGISTER_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"),
ACCESS_RESPONSE_ERROR("A0304","装置注册,装置侧应答失败!"),

View File

@@ -3,7 +3,9 @@ package com.njcn.access.utils;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.pojo.dto.NoticeUserDto;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
@@ -21,12 +23,17 @@ import java.nio.charset.StandardCharsets;
@Component
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
public class SendMessageUtil {
@Value("${app.sendUrl:https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push}")
private String appSendUrl;
//App客户端消息推送
public void sendEventToUser(NoticeUserDto noticeUserDto) {
try {
// 创建一个URL对象指定目标HTTPS接口地址
URL url = new URL("https://fc-mp-ff7b310f-94c9-4468-8260-109111c0a6b2.next.bspapp.com/push");
URL url = new URL(appSendUrl);
// 打开HTTPS连接
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法为POST

View File

@@ -2,6 +2,7 @@ package com.njcn.access.controller;
import com.njcn.access.param.DevAccessParam;
import com.njcn.access.service.ICsDeviceService;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
@@ -34,6 +35,7 @@ import org.springframework.web.bind.annotation.*;
public class CsDeviceController extends BaseController {
private final ICsDeviceService csDeviceService;
private final ICsEquipmentDeliveryService csEquipmentDeliveryService;
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/register")
@@ -130,4 +132,32 @@ public class CsDeviceController extends BaseController {
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "success", methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/updateRunStatus")
@ApiOperation("设备状态调整")
@ApiImplicitParams({
@ApiImplicitParam(name = "nDid", value = "设备识别码", required = true),
@ApiImplicitParam(name = "runStatus", value = "状态", required = true)
})
public HttpResult<String> updateRunStatus(@RequestParam String nDid, @RequestParam Integer runStatus){
String methodDescribe = getMethodDescribe("updateRunStatus");
csEquipmentDeliveryService.updateRunStatusBynDid(nDid,runStatus);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, "success", methodDescribe);
}
@OperateInfo(info = LogEnum.BUSINESS_COMMON)
@PostMapping("/onlineRegister")
@ApiOperation("监测设备接入")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectId", value = "项目id", required = true),
@ApiImplicitParam(name = "nDid", value = "设备识别码", required = true)
})
@ReturnMsg
public HttpResult<String> onlineRegister(@RequestParam("projectId") String projectId,@RequestParam("nDid") String nDid){
String methodDescribe = getMethodDescribe("onlineRegister");
String result = csDeviceService.onlineRegister(projectId,nDid);
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS, result, methodDescribe);
}
}

View File

@@ -32,7 +32,7 @@ import java.util.List;
@Slf4j
@RestController
@RequestMapping("/csLineLatestData")
@Api(tags = "暂降事件")
@Api(tags = "治理设备模块运行状态记录")
@AllArgsConstructor
public class CsLineLatestDataController extends BaseController {

View File

@@ -312,7 +312,7 @@ public class MqttMessageHandler {
if (Objects.equals(res.getCode(),AccessEnum.SUCCESS.getCode())){
int mid = 1;
//修改装置状态
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode());
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.ACCESS.getCode(),null,null);
csEquipmentDeliveryService.updateRunStatusBynDid(nDid,AccessEnum.ONLINE.getCode());
//记录设备上线
PqsCommunicateDto dto = new PqsCommunicateDto();

View File

@@ -1,12 +1,12 @@
package com.njcn.access.listener;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.service.ICsDeviceOnlineLogsService;
import com.njcn.access.service.ICsEquipmentDeliveryService;
import com.njcn.access.service.impl.CsDeviceServiceImpl;
@@ -15,12 +15,14 @@ import com.njcn.access.utils.RedisSetUtil;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.common.pojo.dto.DeviceLogDTO;
import com.njcn.csdevice.api.*;
import com.njcn.csdevice.param.DeviceMessageParam;
import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.dto.PqsCommunicateDto;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.rt.pojo.dto.BaseRealDataSet;
import com.njcn.user.api.AppInfoSetFeignClient;
import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
@@ -37,9 +39,6 @@ import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -81,6 +80,11 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
private MqttPublisher publisher;
@Resource
private RedisSetUtil redisSetUtil;
@Resource
private AppInfoSetFeignClient appInfoSetFeignClient;
@Resource
private DeviceMessageFeignClient deviceMessageFeignClient;
private final Object lock = new Object();
@@ -140,7 +144,7 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
//装置下线
csEquipmentDeliveryService.updateRunStatusBynDid(nDid, AccessEnum.OFFLINE.getCode());
//装置调整为注册状态
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode());
csEquipmentDeliveryService.updateStatusBynDid(nDid,AccessEnum.REGISTERED.getCode(),null,null);
logDto.setOperate(nDid +"装置离线");
sendMessage(nDid);
//记录装置掉线时间
@@ -151,68 +155,8 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
dto.setDescription("通讯中断");
csCommunicateFeignClient.insertion(dto);
csLogsFeignClient.addUserLog(logDto);
}
private void startScheduledTask(ScheduledExecutorService scheduler, String nDid, String version) {
synchronized (lock) {
//判断是否推送消息
boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
if (devModel) {
NoticeUserDto dto = sendOffLine(nDid);
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
}
String clientName = "NJCN-" + nDid.substring(nDid.length() - 6);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
log.info(nDid + "执行重连定时任务...");
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setOperate(nDid + "重连定时任务");
//判断客户端
boolean mqttClient = mqttUtil.judgeClientOnline(clientName);
if (mqttClient) {
csDeviceService.devAccessAskTemplate(nDid,version,1);
try {
Thread.sleep(5000);
Integer status = csEquipmentDeliveryService.queryEquipmentBynDid(nDid).getRunStatus();
if (Objects.equals(status,AccessEnum.ONLINE.getCode())){
logDto.setResult(1);
scheduler.shutdown();
redisUtil.saveByKey(AppRedisKey.DEVICE_MID + nDid,1);
return;
} else {
logDto.setResult(0);
//一个小时未连接上,则推送告警消息
MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 30 && devModel) {
NoticeUserDto dto2 = sendConnectFail(nDid);
sendMessageUtil.sendEventToUser(dto2);
addLogs(dto2);
}
//记录装置掉线时间
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
record.setOfflineTime(LocalDateTime.now());
onlineLogsService.updateById(record);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
//一个小时未连接上,则推送告警消息
MAX_WARNING_TIMES++;
if (MAX_WARNING_TIMES == 30 && devModel) {
NoticeUserDto dto2 = sendConnectFail(nDid);
sendMessageUtil.sendEventToUser(dto2);
addLogs(dto2);
}
logDto.setResult(0);
//记录装置掉线时间
CsDeviceOnlineLogs record = onlineLogsService.findLastData(nDid);
record.setOfflineTime(LocalDateTime.now());
onlineLogsService.updateById(record);
}
csLogsFeignClient.addUserLog(logDto);
}, 0, 2, TimeUnit.MINUTES);
}
//清空缓存
redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid);
}
//判断设备型号发送数据
@@ -220,8 +164,10 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
if (devModel) {
NoticeUserDto dto = sendOffLine(nDid);
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
if (CollectionUtil.isNotEmpty(dto.getPushClientId())) {
sendMessageUtil.sendEventToUser(dto);
addLogs(dto);
}
}
}
@@ -236,22 +182,17 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
String dateStr = localDateTime.format(fmt);
String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + dateStr + "离线");
dto.setContent(content);
dto.setPushClientId(getEventUser(po.getId(),true));
return dto;
}
//重连失败通知
private NoticeUserDto sendConnectFail(String nDid) {
NoticeUserDto dto = new NoticeUserDto();
dto.setTitle("设备接入失败");
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(nDid).getData();
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(po.getId()).getData();
DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime localDateTime = LocalDateTime.now();
String dateStr = localDateTime.format(fmt);
String content = String.format(devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + dateStr + "多次接入失败");
dto.setContent(content);
dto.setPushClientId(getEventUser(po.getId(),false));
//获取设备关联的用户
List<String> eventUser = deviceMessageFeignClient.getEventUserByDeviceId(po.getId(),true).getData();
DeviceMessageParam param1 = new DeviceMessageParam();
param1.setUserList(eventUser);
param1.setEventType(2);
//获取打开推送的用户
List<User> users = deviceMessageFeignClient.getSendUserByType(param1).getData();
if (CollectionUtil.isNotEmpty(users)){
dto.setPushClientId(
users.stream().filter(Objects::nonNull).map(User::getDevCode).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList()));
}
return dto;
}
@@ -263,18 +204,4 @@ public class RedisKeyExpirationListener extends KeyExpirationEventMessageListene
dto.setOperate(noticeUserDto.getContent());
csLogsFeignClient.addUserLog(dto);
}
/**
* 获取所有需要推送的用户id
*/
public List<String> getEventUser(String devId, boolean isAdmin) {
List<User> adminUser = appUserFeignClient.getAdminInfo().getData();
List<String> adminList = adminUser.stream().map(User::getId).collect(Collectors.toList());
if (isAdmin) {
List<String> list = csDeviceUserFeignClient.findUserById(devId).getData();
adminList.addAll(list);
}
List<User> users = userFeignClient.appuserByIdList(adminList).getData();
return users.stream().map(User::getDevCode).filter(Objects::nonNull).filter(StringUtils::isNotBlank).distinct().collect(Collectors.toList());
}
}

View File

@@ -56,4 +56,6 @@ public interface ICsDeviceService {
void wlAccess(String nDid);
String autoPortableLedger();
String onlineRegister(String projectId,String nDid);
}

View File

@@ -22,7 +22,7 @@ public interface ICsEquipmentDeliveryService extends IService<CsEquipmentDeliver
* 根据网关id修改装置的状态
* @param nDid 网关id
*/
void updateStatusBynDid(String nDid,Integer status);
void updateStatusBynDid(String nDid,Integer status,String engineeringId, String projectId);
/**
* 根据网关id修改软件信息

View File

@@ -30,6 +30,7 @@ import com.njcn.redis.utils.RedisUtil;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
import com.njcn.system.pojo.po.SysDicTreePO;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.enums.AppRoleEnum;
@@ -190,6 +191,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
@Override
@Transactional(rollbackFor = {Exception.class})
//fixme 这边事务不起作用,中途出错会导致数据部分录入,再次接入会报主键冲突,所以暂时加了个重置按钮,清空台账数据的
public void devAccess(DevAccessParam devAccessParam) {
//日志实体
DeviceLogDTO logDto = new DeviceLogDTO();
@@ -286,6 +288,23 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
}
csLineService.saveBatch(csLinePoList);
redisUtil.saveByKeyWithExpire("accessLineInfo:" + devAccessParam.getNDid(),csLinePoList,30L);
//缓存监测点信息
Map<Integer,String> map = new HashMap<>();
for (CsLinePO item : csLinePoList) {
if (Objects.isNull(item.getPosition())){
map.put(item.getClDid(),item.getLineId());
} else {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(2,item.getLineId());
}
}
}
redisUtil.saveByKey(AppRedisKey.LINE_POSITION+devAccessParam.getNDid(),map);
//4.监测点拓扑图表录入关系
appLineTopologyDiagramService.saveBatch(appLineTopologyDiagramPoList);
//5.绑定装置和人的关系
@@ -295,8 +314,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
po.setSubUserId(RequestUtil.getUserIndex());
po.setDeviceId(vo.getId());
csDeviceUserService.saveBatch(Collections.singletonList(po));
//6.修改装置状态
csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode());
//6.修改装置状态;修改设备接入的工程、项目
csEquipmentDeliveryService.updateStatusBynDid(devAccessParam.getNDid(), AccessEnum.REGISTERED.getCode(),devAccessParam.getEngineeringId(), devAccessParam.getProjectId());
//7.发起自动接入请求
devAccessAskTemplate(devAccessParam.getNDid(),version,1);
//8.删除redis监测点模板信息
@@ -323,10 +342,6 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
csMarketDataFeignClient.insertData(userVO.getId(), devAccessParam.getEngineeringId());
}
}
//12.如果设备接入,发现接入的工程项目和之前预设的工程项目不一致,则需要更新原来的预设,使用接入的工程项目
if (!Objects.equals(vo.getAssociatedEngineering(),devAccessParam.getEngineeringId()) || !Objects.equals(vo.getAssociatedProject(),devAccessParam.getProjectId())) {
equipmentFeignClient.updateLedger(devAccessParam.getNDid(), devAccessParam.getEngineeringId(), devAccessParam.getProjectId());
}
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
@@ -384,6 +399,8 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
appLineTopologyDiagramPOQueryWrapper.in("line_id",collect);
appLineTopologyDiagramService.remove(appLineTopologyDiagramPOQueryWrapper);
}
//清空缓存
redisUtil.deleteKeysByString(AppRedisKey.LINE_POSITION+nDid);
}
@Override
@@ -459,6 +476,23 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
});
csLineService.saveBatch(csLinePoList);
redisUtil.saveByKeyWithExpire("accessLineInfo:" + nDid,csLinePoList,30L);
//缓存监测点信息
Map<Integer,String> map = new HashMap<>();
for (CsLinePO item : csLinePoList) {
if (Objects.isNull(item.getPosition())){
map.put(item.getClDid(),item.getLineId());
} else {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(2,item.getLineId());
}
}
}
redisUtil.saveByKey(AppRedisKey.LINE_POSITION+nDid,map);
//4.生成装置和模板的关系表
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
csDevModelRelationAddParm.setDevId(vo.getId());
@@ -528,13 +562,138 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
param.setDescription("便携式项目");
param.setTopoIds(Collections.singletonList("99ed9b9c8cf9007cc4d2ac4c7127b7e4"));
param.setSort(Integer.MAX_VALUE);
csProjectPO = appProjectFeignClient.addAppProject(param).getData();
csProjectPO = appProjectFeignClient.addPortableProject(param).getData();
}
//修改已存在的便携式设备
csLedgerService.updatePortableLedger(csEngineeringPO.getId(),csProjectPO.getId());
return csProjectPO.getId();
}
@Override
@Transactional(rollbackFor = Exception.class)
public String onlineRegister(String projectId,String nDid) {
String result = "fail";
// 根据模板接入设备
DeviceLogDTO logDto = new DeviceLogDTO();
logDto.setUserName(RequestUtil.getUserNickname());
logDto.setLoginName(RequestUtil.getUsername());
logDto.setOperate("监测设备"+nDid+"注册、接入");
logDto.setResult(1);
try {
// 设备状态判断
checkDeviceStatus(nDid);
// 询问设备支持的主题信息,并将支持的主题入库
askAndStoreTopics(nDid);
Thread.sleep(2000);
// MQTT询问装置用的模板并判断库中是否存在模板
checkDeviceModel(nDid);
Thread.sleep(2000);
CsEquipmentDeliveryVO vo = equipmentFeignClient.queryEquipmentByndid(nDid).getData();
List<CsLinePO> csLinePoList = new ArrayList<>();
//1.录入装置台账信息
//新增监测设备
CsLedgerParam csLedgerParam = new CsLedgerParam();
csLedgerParam.setId(vo.getId());
csLedgerParam.setPid(projectId);
csLedgerParam.setName(vo.getName());
csLedgerParam.setLevel(2);
csLedgerParam.setSort(0);
csLedgerService.addLedgerTree(csLedgerParam);
//2.根据模板获取监测点个数,插入监测点表
Thread.sleep(2000);
List<CsModelDto> modelList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.MODEL + nDid),CsModelDto.class);
if (CollUtil.isEmpty(modelList)) {
throwExceptionAndLog(nDid,AccessResponseEnum.MODEL_ERROR, logDto);
}
List<CsDataSet> list = csDataSetService.getDataSetData(modelList.get(0).getModelId());
list.forEach(item->{
CsLinePO po = new CsLinePO();
po.setLineId(nDid + item.getClDev().toString());
po.setName(item.getClDev().toString() + "#监测点");
po.setStatus(1);
po.setClDid(item.getClDev());
po.setLineNo(item.getClDev());
po.setRunStatus(0);
po.setDeviceId(vo.getId());
po.setDataSetId(item.getId());
po.setDataModelId(item.getPid());
//防止主键重复
QueryWrapper<CsLinePO> qw = new QueryWrapper<>();
qw.eq("line_id",po.getLineId());
if(csLineService.getBaseMapper().selectList(qw).isEmpty()){
csLinePoList.add(po);
}
//3.生成台账树监测点数据
CsLedgerParam param = new CsLedgerParam();
param.setId(nDid + item.getClDev().toString());
param.setPid(vo.getId());
param.setName(item.getClDev().toString() + "#监测点");
param.setLevel(3);
param.setSort(0);
csLedgerService.addLedgerTree(param);
});
csLineService.saveBatch(csLinePoList);
redisUtil.saveByKeyWithExpire("accessLineInfo:" + nDid,csLinePoList,30L);
//缓存监测点信息
//缓存监测点信息
Map<Integer,String> map = new HashMap<>();
for (CsLinePO item : csLinePoList) {
if (Objects.isNull(item.getPosition())){
map.put(item.getClDid(),item.getLineId());
} else {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(2,item.getLineId());
}
}
}
redisUtil.saveByKey(AppRedisKey.LINE_POSITION+nDid,map);
//4.生成装置和模板的关系表
CsDevModelRelationAddParm csDevModelRelationAddParm = new CsDevModelRelationAddParm();
csDevModelRelationAddParm.setDevId(vo.getId());
csDevModelRelationAddParm.setModelId(modelList.get(0).getModelId());
csDevModelRelationAddParm.setDid(modelList.get(0).getDid());
csDevModelRelationService.addDevModelRelation(csDevModelRelationAddParm);
//5.绑定装置和人的关系
CsDeviceUserPO po = new CsDeviceUserPO();
po.setPrimaryUserId(RequestUtil.getUserIndex());
po.setStatus("1");
po.setSubUserId(RequestUtil.getUserIndex());
po.setDeviceId(vo.getId());
csDeviceUserService.saveBatch(Collections.singletonList(po));
//发起自动接入请求
Thread.sleep(2000);
//先获取版本
String version = "V1";
devAccessAskTemplate(nDid,version,1);
//6.修改流程,接入成功即为实际环境
csEquipmentDeliveryService.updateProcessBynDid(nDid,4);
//7.存储日志
csLogsFeignClient.addUserLog(logDto);
//9.删除redis监测点模板信息
redisUtil.delete(AppRedisKey.MODEL + nDid);
redisUtil.delete(AppRedisKey.LINE + nDid);
//判断接入状态
Thread.sleep(5000);
Object object = redisUtil.getObjectByKey("online" + nDid);
if (Objects.nonNull(object)) {
result = "success";
}
} catch (Exception e) {
logDto.setResult(0);
logDto.setFailReason(e.getMessage());
csLogsFeignClient.addUserLog(logDto);
resetFactory(nDid);
throw new BusinessException(e.getMessage());
}
return result;
}
private void checkDeviceStatus(String nDid) {
DeviceLogDTO logDto = createLogDto("当前设备"+nDid+"状态判断");
CsEquipmentDeliveryVO csEquipmentDeliveryVO = csEquipmentDeliveryService.queryEquipmentBynDid(nDid);
@@ -546,7 +705,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_NOT_FIND, logDto);
}
String code = sysDicTreePo.getCode();
if (!Objects.equals(code, DicDataEnum.PORTABLE.getCode())) {
if (!Objects.equals(code, DicDataEnum.PORTABLE.getCode()) && !Objects.equals(code, DicDataEnum.DEV_CLD.getCode())) {
throwExceptionAndLog(nDid,AccessResponseEnum.DEV_IS_NOT_PORTABLE, logDto);
}
if (!mqttUtil.judgeClientOnline("NJCN-" + nDid.substring(nDid.length() - 6))) {
@@ -778,6 +937,7 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
List<CsLinePO> lineList;
Object object = redisUtil.getObjectByKey("accessLineInfo:" + nDid);
if (Objects.isNull(object)) {
Map<Integer,String> map = new HashMap<>();
lineList = csLineFeignClient.findByNdid(nDid).getData();
for (CsLinePO item : lineList) {
if (item.getClDid() == 0) {
@@ -785,10 +945,23 @@ public class CsDeviceServiceImpl implements ICsDeviceService {
} else {
updateLineIds(modelMap.get(1), item.getClDid(), nDid);
}
//缓存监测点信息
if (Objects.isNull(item.getPosition())){
map.put(item.getClDid(),item.getLineId());
} else {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(2,item.getLineId());
}
}
}
redisUtil.saveByKey(AppRedisKey.LINE_POSITION+nDid,map);
}
publisher.send("/Pfm/DevCmd/"+version+"/"+nDid, new Gson().toJson(getJson(mid,TypeEnum.TYPE_5.getCode())), 1, false);
// redisUtil.saveByKeyWithExpire("startFile:" + nDid, null, 60L);
result = true;
} catch (Exception e) {
DeviceLogDTO logDto = new DeviceLogDTO();

View File

@@ -41,9 +41,15 @@ public class CsEquipmentDeliveryServiceImpl extends ServiceImpl<CsEquipmentDeliv
private final CsLogsFeignClient csLogsFeignClient;
@Override
public void updateStatusBynDid(String nDid,Integer status) {
public void updateStatusBynDid(String nDid,Integer status,String engineeringId, String projectId) {
LambdaUpdateWrapper<CsEquipmentDeliveryPO> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getStatus,status).eq(CsEquipmentDeliveryPO::getNdid,nDid);
if (engineeringId != null && !engineeringId.isEmpty()) {
lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getAssociatedEngineering,engineeringId);
}
if (projectId != null && !projectId.isEmpty()) {
lambdaUpdateWrapper.set(CsEquipmentDeliveryPO::getAssociatedProject,projectId);
}
this.update(lambdaUpdateWrapper);
}

View File

@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
public class CsLedgerServiceImpl extends ServiceImpl<CsLedgerMapper, CsLedger> implements ICsLedgerService {
@Override
@Transactional(rollbackFor = {Exception.class})
public CsLedger addLedgerTree(CsLedgerParam csLedgerParam) {
CsLedger fatherCsLedger = this.lambdaQuery().eq(CsLedger::getId,csLedgerParam.getPid()).eq(CsLedger::getState,1).one();
CsLedger csLedger = new CsLedger();

View File

@@ -299,12 +299,12 @@ public class RtServiceImpl implements IRtService {
baseRealDataSet.setSB(FloatUtils.get2Float(map.get("Pq_SB")));
baseRealDataSet.setSC(FloatUtils.get2Float(map.get("Pq_SC")));
baseRealDataSet.setSTot(FloatUtils.get2Float(map.get("Pq_TotSM")));
//功率因数
//视在功率因数
baseRealDataSet.setPfA(FloatUtils.get2Float(map.get("Pq_PFA")));
baseRealDataSet.setPfB(FloatUtils.get2Float(map.get("Pq_PFB")));
baseRealDataSet.setPfC(FloatUtils.get2Float(map.get("Pq_PFC")));
baseRealDataSet.setPfTot(FloatUtils.get2Float(map.get("Pq_TotPFM")));
//基波功率因数
//位移功率因数
baseRealDataSet.setDpfA(FloatUtils.get2Float(map.get("Pq_DFA")));
baseRealDataSet.setDpfB(FloatUtils.get2Float(map.get("Pq_DFB")));
baseRealDataSet.setDpfC(FloatUtils.get2Float(map.get("Pq_DFC")));

View File

@@ -2,13 +2,16 @@ package com.njcn.stat.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.njcn.access.api.CsDeviceFeignClient;
import com.njcn.access.api.CsLineLatestDataFeignClient;
import com.njcn.access.enums.AccessEnum;
import com.njcn.access.pojo.po.CsLineLatestData;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.common.utils.PubUtils;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DataArrayFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.pojo.param.DataArrayParam;
import com.njcn.csdevice.pojo.po.CsDataArray;
import com.njcn.csdevice.pojo.po.CsEquipmentDeliveryPO;
@@ -51,12 +54,12 @@ import java.util.concurrent.TimeUnit;
public class StatServiceImpl implements IStatService {
private final DataArrayFeignClient dataArrayFeignClient;
private final DicDataFeignClient dicDataFeignClient;
private final InfluxDbUtils influxDbUtils;
private final CsLineFeignClient csLineFeignClient;
private final RedisUtil redisUtil;
private final ChannelObjectUtil channelObjectUtil;
private final CsLineLatestDataFeignClient csLineLatestDataFeignClient;
private final CsDeviceFeignClient csDeviceFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private static final Map<String, String> PHASE_MAPPING = new HashMap<String, String>() {{
put("AB", "A");
put("BC", "B");
@@ -83,7 +86,7 @@ public class StatServiceImpl implements IStatService {
String lineId = null;
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appAutoDataMessage.getId());
if (Objects.isNull(object1)){
lineInfo(appAutoDataMessage.getId());
deviceMessageFeignClient.getLineInfo(appAutoDataMessage.getId());
}
//获取当前设备信息判断装置型号,来筛选监测点
List<CsEquipmentDeliveryPO> poList = channelObjectUtil.objectToList(redisUtil.getObjectByKey(AppRedisKey.DEVICE_LIST),CsEquipmentDeliveryPO.class);
@@ -106,7 +109,6 @@ public class StatServiceImpl implements IStatService {
//云前置设备
else if (Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)) {
lineId = appAutoDataMessage.getId() + appAutoDataMessage.getMsg().getClDid();
}
//获取当前设备信息
@@ -133,7 +135,8 @@ public class StatServiceImpl implements IStatService {
default:
break;
}
int clDid = Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)?1:appAutoDataMessage.getMsg().getClDid();
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
int clDid = flag?1:appAutoDataMessage.getMsg().getClDid();
String key = AppRedisKey.DEV_MODEL.concat(dataArrayParam.getId() + dataArrayParam.getDid() + clDid + dataArrayParam.getStatMethod() + dataArrayParam.getIdx());
Object object = redisUtil.getObjectByKey(key);
List<CsDataArray> dataArrayList;
@@ -142,10 +145,11 @@ public class StatServiceImpl implements IStatService {
} else {
dataArrayList = objectToList(object);
}
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code);
List<String> result = assembleData(lineId,dataArrayList,item,appAutoDataMessage.getMsg().getClDid(),dataArrayParam.getStatMethod(),po.getProcess(),code,po.getDevAccessMethod());
recordList.addAll(result);
//获取时间
long devTime = Objects.equals(DicDataEnum.DEV_CLD.getCode(),code)?item.getDataTimeSec():item.getDataTimeSec()-8*3600;
boolean timeFlag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), code) && Objects.equals(po.getDevAccessMethod(), "CLD");
long devTime = timeFlag?item.getDataTimeSec():item.getDataTimeSec()-8*3600;
time = Instant.ofEpochSecond(devTime)
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
@@ -159,34 +163,12 @@ public class StatServiceImpl implements IStatService {
csLineLatestData.setTimeId(Objects.isNull(time) ? LocalDateTime.now() : time);
csLineLatestDataFeignClient.addData(csLineLatestData);
}
}
System.gc();
}
/**
* 缓存监测点相关信息
*/
public void lineInfo(String id) {
Map<Integer,String> map = new HashMap<>();
List<CsLinePO> lineList = csLineFeignClient.findByNdid(id).getData();
if (CollectionUtil.isEmpty(lineList)){
throw new BusinessException(StatResponseEnum.LINE_NULL);
}
for (CsLinePO item : lineList) {
if (Objects.isNull(item.getPosition())){
map.put(item.getClDid(),item.getLineId());
} else {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(2,item.getLineId());
}
//判断设备运行状态
if (!Objects.isNull(po.getRunStatus()) && po.getRunStatus() == 1) {
csDeviceFeignClient.updateRunStatus(appAutoDataMessage.getId(), AccessEnum.ONLINE.getCode());
}
}
redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,30L);
System.gc();
}
/**
@@ -205,7 +187,7 @@ public class StatServiceImpl implements IStatService {
/**
* influxDB数据组装
*/
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType) {
public List<String> assembleData(String lineId,List<CsDataArray> dataArrayList,AppAutoDataMessage.DataArray item,Integer clDid,String statMethod,Integer process,String devType,String accessMethod) {
List<String> records = new ArrayList<String>();
//解码
List<Float> floats = PubUtils.byteArrayToFloatList(Base64.getDecoder().decode(item.getData()));
@@ -232,8 +214,9 @@ public class StatServiceImpl implements IStatService {
//这边特殊处理如果数据为3.14159则将数据置为null
fields.put(dataArrayList.get(i).getName(),Objects.equals(floats.get(i),3.14159f) ? null:floats.get(i));
fields.put(InfluxDBTableConstant.IS_ABNORMAL,item.getDataTag());
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时
Point point = influxDbUtils.pointBuilder(tableName, Objects.equals(DicDataEnum.DEV_CLD.getCode(),devType)?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
//fixme 设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
boolean flag = Objects.equals(DicDataEnum.DEV_CLD.getCode(), devType) && Objects.equals(accessMethod, "CLD");
Point point = influxDbUtils.pointBuilder(tableName, flag?item.getDataTimeSec():item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
records.add(batchPoints.lineProtocol());

View File

@@ -73,6 +73,11 @@
<artifactId>system-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>cs-system-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>cs-harmonic-api</artifactId>
@@ -99,6 +104,11 @@
<artifactId>event-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.12</version>
</dependency>
</dependencies>
<build>

View File

@@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.EnableAsync;
/**
@@ -18,6 +19,7 @@ import org.springframework.context.annotation.DependsOn;
@MapperScan("com.njcn.**.mapper")
@EnableFeignClients(basePackages = "com.njcn")
@SpringBootApplication(scanBasePackages = "com.njcn")
@EnableAsync
public class ZlEventBootApplication {
public static void main(String[] args) {

View File

@@ -0,0 +1,42 @@
package com.njcn.zlevent.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author xy
*/
@Configuration
@Slf4j
public class AsyncConfig implements AsyncConfigurer {
@Bean("eventNotificationExecutor")
public Executor eventNotificationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("event-notify-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("smsNotificationExecutor")
public Executor smsNotificationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("sms-notify-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

View File

@@ -1,28 +1,24 @@
package com.njcn.zlevent.utils;
package com.njcn.zlevent.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import com.njcn.access.pojo.dto.NoticeUserDto;
import com.njcn.access.utils.SendMessageUtil;
import com.njcn.csdevice.api.CsDeviceUserFeignClient;
import com.njcn.csdevice.api.CsLedgerFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.api.EventLogsFeignClient;
import com.njcn.csdevice.param.DeviceMessageParam;
import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.csdevice.pojo.po.CsEventSendMsg;
import com.njcn.csharmonic.pojo.po.CsEventUserPO;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.user.api.AppInfoSetFeignClient;
import com.njcn.user.api.AppUserFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import com.njcn.user.pojo.po.app.AppInfoSet;
import com.njcn.zlevent.service.ICsEventUserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
@@ -30,48 +26,25 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
* 类的介绍
*
* @author xuyang
* @version 1.0.0
* @createTime 2023/9/25 16:08
* @author xy
*/
@Service
@Slf4j
@Component
public class SendEventUtils {
@RequiredArgsConstructor
public class AppNotificationService {
@Resource
private UserFeignClient userFeignClient;
@Resource
private AppUserFeignClient appUserFeignClient;
@Resource
private CsDeviceUserFeignClient csDeviceUserFeignClient;
@Resource
private AppInfoSetFeignClient appInfoSetFeignClient;
@Resource
private EventLogsFeignClient eventLogsFeignClient;
@Resource
private EpdFeignClient epdFeignClient;
@Resource
private ICsEventUserService csEventUserService;
@Resource
private CsLedgerFeignClient csLedgerFeignclient;
@Resource
private SendMessageUtil sendMessageUtil;
@Resource
private EquipmentFeignClient equipmentFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private final EquipmentFeignClient equipmentFeignClient;
private final EpdFeignClient epdFeignClient;
private final ICsEventUserService csEventUserService;
private final SendMessageUtil sendMessageUtil;
private final EventLogsFeignClient eventLogsFeignClient;
private final CsLedgerFeignClient csLedgerFeignClient;
/**
* 事件推送给相关用户
* @param eventType 事件类型 1:事件 2:告警
* @param type 等级 事件分为设备事件暂态事件稳态事件 告警分为级告警级告警级告警
* @param devId 设备id
* @param eventName 事件名称
* @param eventTime 事件发生事件
* @param id 事件id
*/
@Transactional(rollbackFor = Exception.class)
public void sendUser(Integer eventType,String type,String devId, String eventName, LocalDateTime eventTime, String id, String nDid) {
@Async("eventNotificationExecutor")
public void sendAppNotification(Integer eventType, String type, String devId,
String eventName, LocalDateTime eventTime,
String id, String nDid, Double amplitude, Double persistTime) {
int code;
List<User> users = new ArrayList<>();
List<String> eventUser;
@@ -80,11 +53,12 @@ public class SendEventUtils {
List<CsEventSendMsg> csEventSendMsgList = new ArrayList<>();
NoticeUserDto noticeUserDto = new NoticeUserDto();
NoticeUserDto.Payload payload = new NoticeUserDto.Payload();
String content;
String content = null;
List<CsEventUserPO> result = new ArrayList<>();
//获取设备类型 true:治理设备 false:其他类型的设备
boolean devModel = equipmentFeignClient.judgeDevModel(nDid).getData();
if (devModel) {
DevDetailDTO devDetailDto = csLedgerFeignClient.queryDevDetail(devId).getData();
//事件处理
if (eventType == 1){
eventName = epdFeignClient.findByName(eventName).getData().getShowName();
@@ -92,7 +66,7 @@ public class SendEventUtils {
case "1":
code = 3;
//设备自身事件 不推送给用户推送给业务管理
eventUser = getEventUser(devId,false);
eventUser = deviceMessageFeignClient.getEventUserByDeviceId(devId,false).getData();
if (CollectionUtil.isNotEmpty(eventUser)) {
eventUser.forEach(item->{
CsEventUserPO csEventUser = new CsEventUserPO();
@@ -102,7 +76,10 @@ public class SendEventUtils {
result.add(csEventUser);
});
users = getSendUser(eventUser,2);
DeviceMessageParam param1 = new DeviceMessageParam();
param1.setUserList(eventUser);
param1.setEventType(2);
users = deviceMessageFeignClient.getSendUserByType(param1).getData();
if (CollectionUtil.isNotEmpty(users)){
for (User user : users){
userList.add(user.getDevCode());
@@ -115,7 +92,7 @@ public class SendEventUtils {
case "2":
code = 0;
//暂态事件
eventUser = getEventUser(devId,true);
eventUser = deviceMessageFeignClient.getEventUserByDeviceId(devId,true).getData();
if (CollectionUtil.isNotEmpty(eventUser)) {
eventUser.forEach(item->{
CsEventUserPO csEventUser = new CsEventUserPO();
@@ -124,11 +101,19 @@ public class SendEventUtils {
csEventUser.setEventId(id);
result.add(csEventUser);
});
users = getSendUser(eventUser,0);
DeviceMessageParam param1 = new DeviceMessageParam();
param1.setUserList(eventUser);
param1.setEventType(0);
users = deviceMessageFeignClient.getSendUserByType(param1).getData();
if (CollectionUtil.isNotEmpty(users)){
devCodeList = users.stream().map(User::getDevCode).distinct().collect(Collectors.toList());
noticeUserDto.setPushClientId(devCodeList);
noticeUserDto.setTitle("暂态事件");
content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName()
+ "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生暂态事件,事件类型:"
+ eventName
+ ",特征幅值:" + amplitude + "%"
+ ",持续时间:" + persistTime + "s";
}
}
break;
@@ -157,8 +142,9 @@ public class SendEventUtils {
break;
}
//获取台账信息
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(devId).getData();
content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生" + eventName;
if (Objects.isNull(content)) {
content = devDetailDto.getEngineeringName() + "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName() + "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生" + eventName;
}
noticeUserDto.setContent(content);
payload.setType(code);
payload.setPath("/pages/index/message1?type="+payload.getType());
@@ -249,57 +235,4 @@ public class SendEventUtils {
}
}
}
/**
* 获取所有需要推送的用户id
*/
public List<String> getEventUser(String devId,boolean isAdmin) {
List<User> adminUser = appUserFeignClient.getAdminInfo().getData();
List<String> adminList = adminUser.stream().map(User::getId).collect(Collectors.toList());
if (isAdmin) {
List<String> list = csDeviceUserFeignClient.findUserById(devId).getData();
adminList.addAll(list);
}
return adminList;
}
/**
* 获取所有打开推送的用户信息
*/
public List<User> getSendUser(List<String> userList,Integer type) {
List<User> users = new ArrayList<>();
List<String> result = new ArrayList<>();
List<AppInfoSet> appInfoSet = appInfoSetFeignClient.getListById(userList).getData();
switch (type) {
case 0:
result = appInfoSet.stream()
.filter(person -> person.getEventInfo() == 1)
.map(AppInfoSet::getUserId).collect(Collectors.toList());
break;
case 1:
result = appInfoSet.stream()
.filter(person -> person.getHarmonicInfo() == 1)
.map(AppInfoSet::getUserId).collect(Collectors.toList());
break;
case 2:
result = appInfoSet.stream()
.filter(person -> person.getRunInfo() == 1)
.map(AppInfoSet::getUserId).collect(Collectors.toList());
break;
case 3:
result = appInfoSet.stream()
.filter(person -> person.getAlarmInfo() == 1)
.map(AppInfoSet::getUserId).collect(Collectors.toList());
break;
default:
break;
}
if (CollectionUtil.isNotEmpty(result)){
users = userFeignClient.appuserByIdList(result).getData();
}
return users;
}
}

View File

@@ -0,0 +1,70 @@
package com.njcn.zlevent.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.StrUtil;
import com.njcn.csdevice.api.CsLedgerFeignClient;
import com.njcn.csdevice.api.SmsSendFeignClient;
import com.njcn.csdevice.pojo.dto.DevDetailDTO;
import com.njcn.cssystem.api.AppMsgSetFeignClient;
import com.njcn.user.api.UserFeignClient;
import com.njcn.user.pojo.po.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author xy
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class SmsNotificationService {
private final AppMsgSetFeignClient appMsgSetFeignClient;
private final UserFeignClient userFeignClient;
private final SmsSendFeignClient smsSendFeignClient;
private final CsLedgerFeignClient csLedgerFeignclient;
@Value("${msg.msg_sign:南京灿能电力}")
private String msgSign;
@Async("smsNotificationExecutor")
public void sendSmsForDipEvent(String deviceId, LocalDateTime eventTime,double amplitude,double persistTime) {
try {
List<String> userIdList = appMsgSetFeignClient.queryUserIdsByDeviceId(deviceId).getData();
if (CollectionUtil.isNotEmpty(userIdList)) {
List<User> userList = userFeignClient.getUserListByIds(userIdList).getData();
if (CollectionUtil.isNotEmpty(userList)) {
List<User> userList1 = userList.stream()
.filter(item -> StrUtil.isNotBlank(item.getPhone()) && Objects.equals(item.getSmsNotice(), 1))
.collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(userList1)) {
DevDetailDTO devDetailDto = csLedgerFeignclient.queryDevDetail(deviceId).getData();
String msgContent = "" + msgSign + "" + devDetailDto.getEngineeringName()
+ "-" + devDetailDto.getProjectName() + "-" + devDetailDto.getEquipmentName()
+ "" + eventTime.format(DatePattern.NORM_DATETIME_MS_FORMATTER) + "发生暂降事件"
+ ",特征幅值:" + amplitude + "%"
+ ",持续时间:" + persistTime + "s";
userList1.forEach(item -> {
try {
smsSendFeignClient.sendSmsSimple(item.getPhone(), msgContent, "verify_code");
} catch (Exception e) {
log.error("发送短信失败,手机号: {}", item.getPhone(), e);
}
});
}
}
}
} catch (Exception e) {
log.error("异步发送暂降事件短信失败设备ID: {}", deviceId, e);
}
}
}

View File

@@ -3,9 +3,7 @@ package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.access.pojo.po.CsDeviceOnlineLogs;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.EquipmentFeignClient;
@@ -21,10 +19,10 @@ import com.njcn.system.pojo.po.EleEpdPqd;
import com.njcn.system.pojo.po.SysDicTreePO;
import com.njcn.zlevent.mapper.CsEventMapper;
import com.njcn.zlevent.pojo.po.CsEventLogs;
import com.njcn.zlevent.service.AppNotificationService;
import com.njcn.zlevent.service.ICsAlarmService;
import com.njcn.zlevent.service.ICsEventLogsService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.utils.SendEventUtils;
import lombok.AllArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@@ -52,11 +50,11 @@ public class CsAlarmServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
private final EquipmentFeignClient equipmentFeignClient;
private final EventServiceImpl eventService;
private final ICsEventService csEventService;
private final SendEventUtils sendEventUtils;
private final ICsEventLogsService csEventLogsService;
private final EpdFeignClient epdFeignClient;
private final RedisUtil redisUtil;
private final ChannelObjectUtil channelObjectUtil;
private final AppNotificationService appNotificationService;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -128,9 +126,9 @@ public class CsAlarmServiceImpl extends ServiceImpl<CsEventMapper, CsEventPO> im
//推送事件逻辑处理 && cs_event_user入库 && 修改字典中告警事件的编码
for (AppEventMessage.DataArray item : dataArray) {
if (Objects.isNull(item.getCode())){
sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid());
appNotificationService.sendAppNotification(2,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid(),null,null);
} else {
sendEventUtils.sendUser(2,item.getType(),po.getId(),item.getCode(),eventTime,id,po.getNdid());
appNotificationService.sendAppNotification(2,item.getType(),po.getId(),item.getCode(),eventTime,id,po.getNdid(),null, null);
//更新字典信息
EleEpdPqd eleEpdPqd = epdFeignClient.findByName(item.getName()).getData();
EleEpdPqdParam.EleEpdPqdUpdateParam updateParam = new EleEpdPqdParam.EleEpdPqdUpdateParam();

View File

@@ -1,33 +1,28 @@
package com.njcn.zlevent.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.github.tocrhz.mqtt.publisher.MqttPublisher;
import com.njcn.access.api.CsTopicFeignClient;
import com.njcn.access.utils.ChannelObjectUtil;
import com.njcn.access.utils.FileCommonUtils;
import com.njcn.access.utils.MqttUtil;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.enums.AlgorithmResponseEnum;
import com.njcn.csdevice.pojo.po.CsLinePO;
import com.njcn.mq.message.AppEventMessage;
import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.po.DictData;
import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.pojo.dto.WaveTimeDto;
import com.njcn.zlevent.service.ICsWaveAnalysisService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -43,14 +38,11 @@ import java.util.stream.Collectors;
public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
private final EquipmentFeignClient equipmentFeignClient;
private final MqttPublisher publisher;
private final CsTopicFeignClient csTopicFeignClient;
private final RedisUtil redisUtil;
private final CsLineFeignClient csLineFeignClient;
private final DicDataFeignClient dicDataFeignClient;
private final ChannelObjectUtil channelObjectUtil;
private final MqttUtil mqttUtil;
private final FileCommonUtils fileCommonUtils;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private static Integer mid = 1;
@Override
@@ -59,7 +51,7 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
List<WaveTimeDto> list = new ArrayList<>();
Object object1 = redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId());
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
deviceMessageFeignClient.getLineInfo(appEventMessage.getId());
}
//获取装置id
String deviceId = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData().getId();
@@ -161,29 +153,4 @@ public class CsWaveAnalysisServiceImpl implements ICsWaveAnalysisService {
waveTimeDto.setLocation(location);
return waveTimeDto;
}
/**
* 缓存监测点相关信息
*/
public void lineInfo(String id) {
Map<Integer,String> map = new HashMap<>();
List<CsLinePO> lineList = csLineFeignClient.findByNdid(id).getData();
if (CollectionUtil.isEmpty(lineList)){
throw new BusinessException(StatResponseEnum.LINE_NULL);
}
for (CsLinePO item : lineList) {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.isNull(dictData)){
throw new BusinessException(StatResponseEnum.DICT_NULL);
}
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(2,item.getLineId());
}
}
redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,30L);
}
}

View File

@@ -8,6 +8,7 @@ import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.njcn.common.pojo.exception.BusinessException;
import com.njcn.csdevice.api.CsLineFeignClient;
import com.njcn.csdevice.api.DeviceMessageFeignClient;
import com.njcn.csdevice.api.EquipmentFeignClient;
import com.njcn.csdevice.api.WlRecordFeignClient;
import com.njcn.csdevice.pojo.param.WlRecordParam;
@@ -26,16 +27,17 @@ import com.njcn.redis.pojo.enums.AppRedisKey;
import com.njcn.redis.utils.RedisUtil;
import com.njcn.stat.enums.StatResponseEnum;
import com.njcn.system.api.DicDataFeignClient;
import com.njcn.system.api.DictTreeFeignClient;
import com.njcn.system.api.EpdFeignClient;
import com.njcn.system.enums.DicDataEnum;
import com.njcn.system.pojo.dto.EpdDTO;
import com.njcn.system.pojo.po.DictData;
import com.njcn.zlevent.pojo.constant.ZlConstant;
import com.njcn.zlevent.service.ICsEventLogsService;
import com.njcn.zlevent.service.AppNotificationService;
import com.njcn.zlevent.service.ICsEventService;
import com.njcn.zlevent.service.IEventService;
import com.njcn.zlevent.utils.SendEventUtils;
import lombok.AllArgsConstructor;
import com.njcn.zlevent.service.SmsNotificationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
@@ -63,7 +65,7 @@ import java.util.stream.Collectors;
*/
@Service
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
public class EventServiceImpl implements IEventService {
private final CsLineFeignClient csLineFeignClient;
@@ -73,10 +75,12 @@ public class EventServiceImpl implements IEventService {
private final ICsEventService csEventService;
private final EquipmentFeignClient equipmentFeignClient;
private final InfluxDbUtils influxDbUtils;
private final ICsEventLogsService csEventLogsService;
private final SendEventUtils sendEventUtils;
private final WlRecordFeignClient wlRecordFeignClient;
private final WlRmpEventDetailMapper wlRmpEventDetailMapper;
private final DictTreeFeignClient dictTreeFeignClient;
private final DeviceMessageFeignClient deviceMessageFeignClient;
private final AppNotificationService appNotificationService;
private final SmsNotificationService smsNotificationService;
@Override
@DSTransactional
@@ -92,14 +96,14 @@ public class EventServiceImpl implements IEventService {
}
//判断监测点是否存在
if (Objects.isNull(object1)){
lineInfo(appEventMessage.getId());
deviceMessageFeignClient.getLineInfo(appEventMessage.getId());
}
//获取装置id
CsEquipmentDeliveryPO po = equipmentFeignClient.findDevByNDid(appEventMessage.getId()).getData();
//获取设备类型 true:治理设备 false:其他类型的设备
boolean devModel = equipmentFeignClient.judgeDevModel(appEventMessage.getId()).getData();
//判断设备型号
String code = dictTreeFeignClient.queryById(po.getDevType()).getData().getCode();
try {
if (devModel) {
if (Objects.equals(DicDataEnum.CONNECT_DEV.getCode(),code)) {
if (Objects.equals(appEventMessage.getDid(),1)){
Object object = new Gson().fromJson(String.valueOf(redisUtil.getObjectByKey(AppRedisKey.LINE_POSITION+appEventMessage.getId())), Map.class).get("0");
if (ObjectUtil.isNotNull(object)) {
@@ -173,7 +177,7 @@ public class EventServiceImpl implements IEventService {
fields.put(param.getName(),param.getData());
}
//只有治理型号的设备有监测位置
if (devModel) {
if (Objects.equals(DicDataEnum.CONNECT_DEV.getCode(),code)) {
if (appEventMessage.getMsg().getClDid() == 1) {
fields.put("Evt_Param_Position","电网侧");
csEvent.setLocation("grid");
@@ -182,7 +186,7 @@ public class EventServiceImpl implements IEventService {
csEvent.setLocation("load");
}
}
//fixme 这边前置传递的应该是UTC时间,但是前置说是传递的北京时间,讨论了一下没太理解。这边暂时先这样处理,influx入库处理成北京时间,减去8小时
//fixme 设备上送的是北京时间,时序数据库录入时 需要utc时间减去8小时
Point point = influxDbUtils.pointBuilder(tableName, item.getDataTimeSec()-8*3600, TimeUnit.SECONDS, tags, fields);
BatchPoints batchPoints = BatchPoints.database(influxDbUtils.getDbName()).retentionPolicy("").consistency(InfluxDB.ConsistencyLevel.ALL).build();
batchPoints.point(point);
@@ -191,6 +195,10 @@ public class EventServiceImpl implements IEventService {
list1.add(csEvent);
}
}
//evt_data入库
if (CollectionUtil.isNotEmpty(records)) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
//cs_event入库
if (CollectionUtil.isNotEmpty(list1)){
csEventService.saveBatch(list1);
@@ -199,15 +207,28 @@ public class EventServiceImpl implements IEventService {
if (CollectionUtil.isNotEmpty(filterList)) {
filterList.forEach(this::insertEvent);
}
//推送事件逻辑处理 && cs_event_user入库
//异步推送事件逻辑处理 && cs_event_user入库
for (AppEventMessage.DataArray item : dataArray) {
sendEventUtils.sendUser(1,item.getType(),po.getId(),item.getName(),eventTime,id,po.getNdid());
double amplitude = 0.0;
double persistTime = 0.0;
List<AppEventMessage.Param> params = item.getParam();
if (CollectionUtil.isNotEmpty(params)) {
for (AppEventMessage.Param param : params) {
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_VVADEPTH)) {
amplitude = Double.parseDouble(String.format("%.2f", Double.parseDouble(param.getData().toString())));
}
if (Objects.equals(param.getName(),ZlConstant.EVT_PARAM_TM)) {
persistTime = Double.parseDouble(String.format("%.2f", Double.parseDouble(param.getData().toString())));
}
}
}
appNotificationService.sendAppNotification(1, item.getType(), po.getId(), item.getName(), eventTime, id, po.getNdid(),amplitude,persistTime);
//如果是暂降事件,则异步发送短信
if (Objects.equals(item.getName(), "Evt_Sys_DipStr")) {
smsNotificationService.sendSmsForDipEvent(po.getId(), eventTime,amplitude,persistTime);
}
}
}
//evt_data入库
if (CollectionUtil.isNotEmpty(records)) {
influxDbUtils.batchInsert(influxDbUtils.getDbName(), "", InfluxDB.ConsistencyLevel.ALL, TimeUnit.MILLISECONDS, records);
}
} catch (Exception e) {
log.error("事件入库异常:{}",e.getMessage());
}
@@ -410,31 +431,6 @@ public class EventServiceImpl implements IEventService {
return Objects.isNull(result)?null:result;
}
/**
* 缓存监测点相关信息
*/
public void lineInfo(String id) {
Map<Integer,String> map = new HashMap<>();
List<CsLinePO> lineList = csLineFeignClient.findByNdid(id).getData();
if (CollectionUtil.isEmpty(lineList)){
throw new BusinessException(StatResponseEnum.LINE_NULL);
}
for (CsLinePO item : lineList) {
DictData dictData = dicDataFeignClient.getDicDataById(item.getPosition()).getData();
if (Objects.isNull(dictData)){
throw new BusinessException(StatResponseEnum.DICT_NULL);
}
if (Objects.equals(dictData.getCode(), DicDataEnum.OUTPUT_SIDE.getCode())){
map.put(0,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.GRID_SIDE.getCode())){
map.put(1,item.getLineId());
} else if (Objects.equals(dictData.getCode(), DicDataEnum.LOAD_SIDE.getCode())){
map.put(2,item.getLineId());
}
}
redisUtil.saveByKeyWithExpire(AppRedisKey.LINE_POSITION+id,map,30L);
}
/**
* 缓存字典和influxDB表关系
*/

View File

@@ -42,7 +42,7 @@ logging:
config: http://@nacos.url@/nacos/v1/cs/configs?tenant=@nacos.namespace@&group=DEFAULT_GROUP&dataId=logback.xml
level:
root: info
com.njcn.middle.rocket.handler.EnhanceConsumerMessageHandler: ERROR
#mybatis配置信息
mybatis-plus:

23
pom.xml
View File

@@ -34,11 +34,17 @@
<!--103本地-->
<!-- <middle.server.url>192.168.1.103</middle.server.url>-->
<!-- <service.server.url>192.168.1.126</service.server.url>-->
<!-- <service.server.url>192.168.2.126</service.server.url>-->
<!-- <docker.server.url>192.168.1.103</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace>72972c43-3c20-4452-a261-66624e17da97</nacos.namespace>-->
<!-- <middle.server.url>192.168.1.162</middle.server.url>-->
<!-- <service.server.url>192.168.1.162</service.server.url>-->
<!-- <docker.server.url>192.168.1.162</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace></nacos.namespace>-->
<!--103线上-->
<middle.server.url>192.168.1.103</middle.server.url>
<service.server.url>192.168.1.103</service.server.url>
@@ -61,11 +67,24 @@
<!--102-->
<!-- <middle.server.url>192.168.1.102</middle.server.url>-->
<!-- <service.server.url>127.0.0.1</service.server.url>-->
<!-- <service.server.url>192.168.1.126</service.server.url>-->
<!-- <docker.server.url>192.168.1.102</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace>d99572a5-415e-480b-bb92-30f05c2f6d93</nacos.namespace>-->
<!-- <middle.server.url>192.168.1.102</middle.server.url>-->
<!-- <service.server.url>192.168.1.126</service.server.url>-->
<!-- <docker.server.url>192.168.1.102</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace>c208a65e-1578-4372-b7c0-97fecd323fe6</nacos.namespace>-->
<!-- <middle.server.url>192.168.1.102</middle.server.url>-->
<!-- <service.server.url>192.168.1.102</service.server.url>-->
<!-- <docker.server.url>192.168.1.102</docker.server.url>-->
<!-- <nacos.url>${middle.server.url}:18848</nacos.url>-->
<!-- <nacos.namespace>c208a65e-1578-4372-b7c0-97fecd323fe6</nacos.namespace>-->
<!-- <middle.server.url>192.168.1.27</middle.server.url>-->
<!-- <service.server.url>127.0.0.1</service.server.url>-->
<!-- <docker.server.url>192.168.1.27</docker.server.url>-->