From b9fb336cac77d4aa2a85c1fa24042cf6bd9b3d72 Mon Sep 17 00:00:00 2001 From: hzj <826100833@qq.com> Date: Mon, 3 Nov 2025 16:30:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E6=80=81=E6=96=87=E4=BB=B6=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=8A=9F=E8=83=BD=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- filesync/.gitignore | 38 ++ filesync/pom.xml | 92 +++++ .../java/com/njcn/FileSyncApplication.java | 15 + .../njcn/device/mapper/PqDeviceMapper.java | 21 ++ .../com/njcn/device/pojo/po/PqDevice.java | 167 +++++++++ .../njcn/device/service/PqDeviceService.java | 22 ++ .../service/impl/PqDeviceServiceImpl.java | 25 ++ .../njcn/filesync/config/FileSyncState.java | 125 +++++++ .../njcn/filesync/config/ServerConfig.java | 34 ++ .../njcn/filesync/config/SourceConfig.java | 35 ++ .../njcn/filesync/config/TargetConfig.java | 35 ++ .../controller/FileSyncController.java | 51 +++ .../filesync/service/FileSyncService.java | 12 + .../service/impl/FileSyncServiceImpl.java | 50 +++ .../filesync/sftp/IncrementalFileSync.java | 329 ++++++++++++++++++ .../com/njcn/filesync/sftp/SftpClient.java | 110 ++++++ .../main/java/com/njcn/job/FileSyncJob.java | 47 +++ .../main/resources/application-wuxi_dev.yml | 104 ++++++ filesync/src/main/resources/application.yml | 84 +++++ filesync/src/test/resources/.gitkeep | 0 pom.xml | 2 + 21 files changed, 1398 insertions(+) create mode 100644 filesync/.gitignore create mode 100644 filesync/pom.xml create mode 100644 filesync/src/main/java/com/njcn/FileSyncApplication.java create mode 100644 filesync/src/main/java/com/njcn/device/mapper/PqDeviceMapper.java create mode 100644 filesync/src/main/java/com/njcn/device/pojo/po/PqDevice.java create mode 100644 filesync/src/main/java/com/njcn/device/service/PqDeviceService.java create mode 100644 filesync/src/main/java/com/njcn/device/service/impl/PqDeviceServiceImpl.java create mode 100644 filesync/src/main/java/com/njcn/filesync/config/FileSyncState.java create mode 100644 filesync/src/main/java/com/njcn/filesync/config/ServerConfig.java create mode 100644 filesync/src/main/java/com/njcn/filesync/config/SourceConfig.java create mode 100644 filesync/src/main/java/com/njcn/filesync/config/TargetConfig.java create mode 100644 filesync/src/main/java/com/njcn/filesync/controller/FileSyncController.java create mode 100644 filesync/src/main/java/com/njcn/filesync/service/FileSyncService.java create mode 100644 filesync/src/main/java/com/njcn/filesync/service/impl/FileSyncServiceImpl.java create mode 100644 filesync/src/main/java/com/njcn/filesync/sftp/IncrementalFileSync.java create mode 100644 filesync/src/main/java/com/njcn/filesync/sftp/SftpClient.java create mode 100644 filesync/src/main/java/com/njcn/job/FileSyncJob.java create mode 100644 filesync/src/main/resources/application-wuxi_dev.yml create mode 100644 filesync/src/main/resources/application.yml create mode 100644 filesync/src/test/resources/.gitkeep diff --git a/filesync/.gitignore b/filesync/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/filesync/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/filesync/pom.xml b/filesync/pom.xml new file mode 100644 index 0000000..fd4c5a2 --- /dev/null +++ b/filesync/pom.xml @@ -0,0 +1,92 @@ + + + 4.0.0 + + + + + com.njcn + data-migration + 1.0.0 + + filesync + + + + + com.njcn + njcn-common + 0.0.1 + + + + com.njcn + mybatis-plus + 0.0.1 + + + + com.njcn + spingboot2.3.12 + 2.3.12 + + + org.springframework.boot + spring-boot + + + + + com.jcraft + jsch + 0.1.55 + + + com.njcn + mybatis-plus + 0.0.1 + + + + mysql + mysql-connector-java + + + + + file-sync + + + org.springframework.boot + spring-boot-maven-plugin + + + package + + repackage + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + + src/main/resources + + **/* + + + + + diff --git a/filesync/src/main/java/com/njcn/FileSyncApplication.java b/filesync/src/main/java/com/njcn/FileSyncApplication.java new file mode 100644 index 0000000..38ca3ff --- /dev/null +++ b/filesync/src/main/java/com/njcn/FileSyncApplication.java @@ -0,0 +1,15 @@ +package com.njcn; + +import lombok.extern.slf4j.Slf4j; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@Slf4j +@MapperScan("com.njcn.**.mapper") +@SpringBootApplication(scanBasePackages = "com.njcn") +public class FileSyncApplication { + public static void main(String[] args) { + SpringApplication.run(FileSyncApplication.class, args); + } +} \ No newline at end of file diff --git a/filesync/src/main/java/com/njcn/device/mapper/PqDeviceMapper.java b/filesync/src/main/java/com/njcn/device/mapper/PqDeviceMapper.java new file mode 100644 index 0000000..e940a66 --- /dev/null +++ b/filesync/src/main/java/com/njcn/device/mapper/PqDeviceMapper.java @@ -0,0 +1,21 @@ +package com.njcn.device.mapper; + + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +import com.njcn.device.pojo.po.PqDevice; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +/** + * + * Description: + * Date: 2025/06/19 下午 1:47【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface PqDeviceMapper extends BaseMapper { + +} diff --git a/filesync/src/main/java/com/njcn/device/pojo/po/PqDevice.java b/filesync/src/main/java/com/njcn/device/pojo/po/PqDevice.java new file mode 100644 index 0000000..7376a92 --- /dev/null +++ b/filesync/src/main/java/com/njcn/device/pojo/po/PqDevice.java @@ -0,0 +1,167 @@ +package com.njcn.device.pojo.po; + +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.time.LocalDate; +import java.time.LocalDateTime; + +/** + * + * Description: + * Date: 2025/06/19 下午 1:47【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +/** + * 靠靠? + */ +@Data +@TableName("pq_device") +public class PqDevice implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 装置序号 + */ + @TableId + private String id; + + /** + * 装置模型(0:虚拟设备;1:实际设备;2:离线设备;)默认是实际设备 + */ + private Integer devModel; + + /** + * 数据类型(0:暂态系统;1:稳态系统;2:两个系统) + */ + private Integer devDataType; + + /** + * 终端运行状态(0:运行;1:检修;2:停运;3:调试;4:退运) + */ + private Integer runFlag; + + /** + * 通讯状态(0:中断;1:正常) + */ + private Integer comFlag; + + /** + * 设备制造商,字典表 + */ + private String manufacturer; + + /** + * 定检状态(0:未检 1:已检) + */ + private Integer checkFlag; + + /** + * 前置类型(MMS、CLD)字典表 + */ + private String frontType; + + /** + * 终端型号(570、580……)字典表 + */ + private String devType; + + /** + * 网络参数 + */ + private String ip; + + /** + * 召唤标志(0:周期触发;1:变为触发) + */ + private Integer callFlag; + + /** + * 端口 + */ + private Integer port; + + /** + * 装置识别码(3ds加密) + */ + private String series; + + /** + * 装置秘钥(3ds加密) + */ + private String devKey; + + /** + * 前置序号Id,前置表 + */ + private String nodeId; + + /** + * 投运时间 + */ + private LocalDate loginTime; + + /** + * 数据更新时间 + */ + private LocalDateTime updateTime; + + /** + * 本次定检时间,默认等于投运时间 + */ + private LocalDate thisTimeCheck; + + /** + * 下次定检时间,默认为投运时间后推3年,假如时间小于3个月则为待检 + */ + private LocalDate nextTimeCheck; + + /** + * 电度功能 0 关闭 1开启 + */ + private Integer electroplate; + + /** + * 对时功能 0 关闭, 1开启 + */ + private Integer onTime; + + /** + * 合同号 + */ + private String contract; + + /** + * 设备sim卡号 + */ + private String sim; + + + /** + * 装置系列 + */ + private String devSeries; + + + /** + * 监测装置安装位置 + */ + private String devLocation; + + + /** + * 监测厂家设备编号 + */ + private String devNo; + + + /** + * 告警功能 0:关闭 null、1:开启 + */ + private Integer isAlarm; +} \ No newline at end of file diff --git a/filesync/src/main/java/com/njcn/device/service/PqDeviceService.java b/filesync/src/main/java/com/njcn/device/service/PqDeviceService.java new file mode 100644 index 0000000..ec4278f --- /dev/null +++ b/filesync/src/main/java/com/njcn/device/service/PqDeviceService.java @@ -0,0 +1,22 @@ +package com.njcn.device.service; + + + +import com.baomidou.mybatisplus.extension.service.IService; + +import com.njcn.device.pojo.po.PqDevice; + +import java.util.List; + +/** + * + * Description: + * Date: 2025/06/19 下午 1:47【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface PqDeviceService extends IService { + + +} diff --git a/filesync/src/main/java/com/njcn/device/service/impl/PqDeviceServiceImpl.java b/filesync/src/main/java/com/njcn/device/service/impl/PqDeviceServiceImpl.java new file mode 100644 index 0000000..77ec175 --- /dev/null +++ b/filesync/src/main/java/com/njcn/device/service/impl/PqDeviceServiceImpl.java @@ -0,0 +1,25 @@ +package com.njcn.device.service.impl; + + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.njcn.device.mapper.PqDeviceMapper; + +import com.njcn.device.pojo.po.PqDevice; +import com.njcn.device.service.PqDeviceService; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * + * Description: + * Date: 2025/06/19 下午 1:47【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +public class PqDeviceServiceImpl extends ServiceImpl implements PqDeviceService { + + +} diff --git a/filesync/src/main/java/com/njcn/filesync/config/FileSyncState.java b/filesync/src/main/java/com/njcn/filesync/config/FileSyncState.java new file mode 100644 index 0000000..5626b9b --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/config/FileSyncState.java @@ -0,0 +1,125 @@ +package com.njcn.filesync.config; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.HashMap; +import java.util.Map; + +/** + * Description: + * Date: 2025/10/16 下午 2:58【需求编号】文件同步状态管理 + * + * @author clam + * @version V1.0.0 + */ +public class FileSyncState { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private String stateFilePath; + private Map fileStates; + + public static class FileMetadata implements Serializable { + private long size; + private long lastModified; + private String checksum; + + public FileMetadata() {} + + public FileMetadata(long size, long lastModified, String checksum) { + this.size = size; + this.lastModified = lastModified; + this.checksum = checksum; + } + + // Getters and Setters + public long getSize() { return size; } + public void setSize(long size) { this.size = size; } + + public long getLastModified() { return lastModified; } + public void setLastModified(long lastModified) { this.lastModified = lastModified; } + + public String getChecksum() { return checksum; } + public void setChecksum(String checksum) { this.checksum = checksum; } + } + + public FileSyncState(String stateFilePath) { + this.stateFilePath = stateFilePath; + this.fileStates = new HashMap<>(); + loadState(); + } + + @SuppressWarnings("unchecked") + private void loadState() { + File stateFile = new File(stateFilePath); + if (stateFile.exists()) { + try { + fileStates =objectMapper.readValue(stateFile, new TypeReference>() {}); + } catch (IOException e) { + System.err.println("Failed to load sync state: " + e.getMessage()); + fileStates = new HashMap<>(); + } + } + } + + public void saveState() { + try { + objectMapper.writerWithDefaultPrettyPrinter() + .writeValue(new File(stateFilePath), fileStates); + } catch (IOException e) { + System.err.println("Failed to save sync state: " + e.getMessage()); + } + } + + public boolean isFileChanged(String filePath, FileMetadata currentMetadata) { + FileMetadata previousMetadata = fileStates.get(filePath); + + if (previousMetadata == null) { + return true; // 新文件 + } + + // 检查文件大小和修改时间 + if (previousMetadata.getSize() != currentMetadata.getSize() || + previousMetadata.getLastModified() != currentMetadata.getLastModified()) { + return true; + } + + // 如果时间相同但需要更精确的比较,可以检查校验和 + if (currentMetadata.getChecksum() != null && + !currentMetadata.getChecksum().equals(previousMetadata.getChecksum())) { + return true; + } + + return false; + } + + public void updateFileState(String filePath, FileMetadata metadata) { + fileStates.put(filePath, metadata); + } + + public FileMetadata getFileMetadata(String filePath) throws IOException { + Path path = Paths.get(filePath); + BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class); + + // 计算简单的校验和(对于大文件,可以只计算部分校验和) + String checksum = calculateSimpleChecksum(path.toFile()); + + return new FileMetadata( + attrs.size(), + attrs.lastModifiedTime().toMillis(), + checksum + ); + } + + private String calculateSimpleChecksum(File file) { + // 简化的校验和计算:文件大小 + 最后修改时间 + // 对于生产环境,建议使用MD5或SHA256 + return file.length() + "-" + file.lastModified(); + } +} diff --git a/filesync/src/main/java/com/njcn/filesync/config/ServerConfig.java b/filesync/src/main/java/com/njcn/filesync/config/ServerConfig.java new file mode 100644 index 0000000..4b31821 --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/config/ServerConfig.java @@ -0,0 +1,34 @@ +package com.njcn.filesync.config; + +import lombok.Data; + +/** + * Description: + * Date: 2025/10/16 下午 2:57【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +public class ServerConfig { + + private String host; + private int port = 22; + private String username; + private String password; + private String privateKeyPath; + private String basePath; + private String serverType; // "windows" 或 "linux" + + public boolean isWindows() { + return "windows".equalsIgnoreCase(serverType); + } + + public boolean usePasswordAuth() { + return password != null && !password.trim().isEmpty(); + } + + public boolean useKeyAuth() { + return privateKeyPath != null && !privateKeyPath.trim().isEmpty(); + } +} diff --git a/filesync/src/main/java/com/njcn/filesync/config/SourceConfig.java b/filesync/src/main/java/com/njcn/filesync/config/SourceConfig.java new file mode 100644 index 0000000..a2b379a --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/config/SourceConfig.java @@ -0,0 +1,35 @@ +package com.njcn.filesync.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +/** + * Description: + * Date: 2025/10/16 下午 2:57【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +@Configuration +@Order(10) +public class SourceConfig { + @Value("${server.source.host}") + private String host; + @Value("${server.source.port}") + private int port = 22; + @Value("${server.source.username}") + + private String username; + @Value("${server.source.password}") + + private String password; + @Value("${server.source.privateKeyPath}") + + private String privateKeyPath; + @Value("${server.source.basePath}") + + private String basePath; +} diff --git a/filesync/src/main/java/com/njcn/filesync/config/TargetConfig.java b/filesync/src/main/java/com/njcn/filesync/config/TargetConfig.java new file mode 100644 index 0000000..6f4bd47 --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/config/TargetConfig.java @@ -0,0 +1,35 @@ +package com.njcn.filesync.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +/** + * Description: + * Date: 2025/10/16 下午 2:57【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Data +@Configuration +@Order(10) +public class TargetConfig { + @Value("${server.target.host}") + private String host; + @Value("${server.target.port}") + + private int port; + @Value("${server.target.username}") + + private String username; + @Value("${server.target.password}") + + private String password; + @Value("${server.target.privateKeyPath}") + + private String privateKeyPath; + @Value("${server.target.basePath}") + private String basePath; +} diff --git a/filesync/src/main/java/com/njcn/filesync/controller/FileSyncController.java b/filesync/src/main/java/com/njcn/filesync/controller/FileSyncController.java new file mode 100644 index 0000000..c6de136 --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/controller/FileSyncController.java @@ -0,0 +1,51 @@ +package com.njcn.filesync.controller; + +import com.njcn.common.pojo.annotation.OperateInfo; +import com.njcn.common.pojo.enums.common.LogEnum; +import com.njcn.common.pojo.enums.response.CommonResponseEnum; +import com.njcn.common.pojo.response.HttpResult; +import com.njcn.filesync.service.FileSyncService; +import com.njcn.web.controller.BaseController; +import com.njcn.web.utils.HttpResultUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; + +/** + * Description: + * Date: 2025/10/20 下午 2:55【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Slf4j +@RestController +@RequestMapping("/filesync") +@RequiredArgsConstructor +@Api(tags = "文件同步") +public class FileSyncController extends BaseController { + private final FileSyncService fileSyncService; + /** + * 手动触发跨服务器同步 + */ + @PostMapping("/trigger") + @OperateInfo(info = LogEnum.SYSTEM_COMMON) + @ApiOperation("暂态文件同步手动触发") + public HttpResult triggerCrossSync() { + String methodDescribe = getMethodDescribe("triggerCrossSync"); + + boolean flag =fileSyncService.triggerCrossSync(); + + return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS,flag , methodDescribe); + } +} diff --git a/filesync/src/main/java/com/njcn/filesync/service/FileSyncService.java b/filesync/src/main/java/com/njcn/filesync/service/FileSyncService.java new file mode 100644 index 0000000..feda2fb --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/service/FileSyncService.java @@ -0,0 +1,12 @@ +package com.njcn.filesync.service; + +/** + * Description: + * Date: 2025/10/20 下午 3:02【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public interface FileSyncService { + boolean triggerCrossSync(); +} diff --git a/filesync/src/main/java/com/njcn/filesync/service/impl/FileSyncServiceImpl.java b/filesync/src/main/java/com/njcn/filesync/service/impl/FileSyncServiceImpl.java new file mode 100644 index 0000000..875d683 --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/service/impl/FileSyncServiceImpl.java @@ -0,0 +1,50 @@ +package com.njcn.filesync.service.impl; + +import com.njcn.device.pojo.po.PqDevice; +import com.njcn.device.service.PqDeviceService; +import com.njcn.filesync.config.ServerConfig; +import com.njcn.filesync.config.SourceConfig; +import com.njcn.filesync.config.TargetConfig; +import com.njcn.filesync.service.FileSyncService; +import com.njcn.filesync.sftp.IncrementalFileSync; +import jdk.nashorn.internal.ir.annotations.Reference; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Description: + * Date: 2025/10/20 下午 3:03【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +@Service +@RequiredArgsConstructor +public class FileSyncServiceImpl implements FileSyncService { + + private final SourceConfig sourceConfig; + private final TargetConfig targetConfig; + private final PqDeviceService pqDeviceService; + @Override + public boolean triggerCrossSync() { + ServerConfig source = new ServerConfig(); + ServerConfig target = new ServerConfig(); + BeanUtils.copyProperties(sourceConfig,source); + BeanUtils.copyProperties(targetConfig,target); + List list = pqDeviceService.lambdaQuery().list(); + if(!CollectionUtils.isEmpty(list)){ + List ipList = list.stream().map(PqDevice::getIp).collect(Collectors.toList()); + + IncrementalFileSync fileSync = new IncrementalFileSync( + source, target, "sync-state.json",ipList); + // 同步文件 + fileSync.syncOnce(); + } + return true; + } +} diff --git a/filesync/src/main/java/com/njcn/filesync/sftp/IncrementalFileSync.java b/filesync/src/main/java/com/njcn/filesync/sftp/IncrementalFileSync.java new file mode 100644 index 0000000..f823155 --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/sftp/IncrementalFileSync.java @@ -0,0 +1,329 @@ +package com.njcn.filesync.sftp; + +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.SftpATTRS; +import com.jcraft.jsch.SftpException; +import com.njcn.filesync.config.FileSyncState; +import com.njcn.filesync.config.ServerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.List; +import java.util.Vector; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Description: + * Date: 2025/10/16 下午 3:02【需求编号】 + * + * @author clam + * @version V1.0.0 + */ +public class IncrementalFileSync { + + private static final Logger logger = LoggerFactory.getLogger(IncrementalFileSync.class); + + private ServerConfig sourceConfig; + private ServerConfig targetConfig; + private FileSyncState syncState; +// private ScheduledExecutorService scheduler; + private WatchService watchService; + private List ipList; + private boolean isMonitoring = false; + + public IncrementalFileSync(ServerConfig sourceConfig, ServerConfig targetConfig, String stateFilePath,List ipList) { + this.sourceConfig = sourceConfig; + this.targetConfig = targetConfig; + this.syncState = new FileSyncState(stateFilePath); +// this.scheduler = Executors.newScheduledThreadPool(2); + this.ipList = ipList; + } + + /** + * 执行一次增量同步 + */ + public void syncOnce() { + logger.info("Starting incremental synchronization..."); + + try (SftpClient sourceClient = new SftpClient(); + SftpClient targetClient = new SftpClient()) { + + // 连接到源服务器和目标服务器 + sourceClient.connect(sourceConfig); + targetClient.connect(targetConfig); + + // 同步文件 + syncDirectory("", sourceClient, targetClient); + + logger.info("Incremental synchronization completed successfully"); + + } catch (Exception e) { + logger.error("Synchronization failed", e); + } finally { + syncState.saveState(); + } + } + + /** + * 开始定时同步 + */ +// public void startScheduledSync(long interval, TimeUnit timeUnit) { +// logger.info("Starting scheduled synchronization every {} {}", interval, timeUnit); +// +// scheduler.scheduleAtFixedRate(() -> { +// try { +// syncOnce(); +// } catch (Exception e) { +// logger.error("Scheduled synchronization failed", e); +// } +// }, 0, interval, timeUnit); +// } + + /** + * 开始文件监控同步(仅当源服务器是本地文件系统时) + */ +// public void startFileWatchSync() throws IOException { +// if (!isLocalServer(sourceConfig)) { +// throw new IllegalStateException("File watching is only supported for local source servers"); +// } +// +// watchService = FileSystems.getDefault().newWatchService(); +// Path sourcePath = Paths.get(sourceConfig.getBasePath()); +// +// // 注册目录监听 +// sourcePath.register(watchService, +// StandardWatchEventKinds.ENTRY_CREATE, +// StandardWatchEventKinds.ENTRY_MODIFY, +// StandardWatchEventKinds.ENTRY_DELETE); +// +// isMonitoring = true; +// logger.info("Starting file watch synchronization for: {}", sourcePath); +// +// scheduler.execute(() -> { +// while (isMonitoring) { +// try { +// WatchKey key = watchService.take(); +// +// for (WatchEvent event : key.pollEvents()) { +// WatchEvent.Kind kind = event.kind(); +// +// if (kind == StandardWatchEventKinds.OVERFLOW) { +// continue; +// } +// +// @SuppressWarnings("unchecked") +// WatchEvent ev = (WatchEvent) event; +// Path fileName = ev.context(); +// Path fullPath = sourcePath.resolve(fileName); +// +// logger.info("File change detected: {} - {}", kind.name(), fullPath); +// +// // 延迟一下,确保文件操作完成 +// Thread.sleep(1000); +// syncOnce(); +// } +// +// key.reset(); +// +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// break; +// } catch (Exception e) { +// logger.error("File watch error", e); +// } +// } +// }); +// } + +// /** +// * 停止所有同步任务 +// */ +// public void stop() { +// isMonitoring = false; +// scheduler.shutdown(); +// +// if (watchService != null) { +// try { +// watchService.close(); +// } catch (IOException e) { +// logger.error("Error closing watch service", e); +// } +// } +// +// logger.info("File synchronization stopped"); +// } + + private void syncDirectory(String relativePath, SftpClient sourceClient, SftpClient targetClient) { +// String sourceDir = getFullPath(sourceConfig.getBasePath(), relativePath); +// String targetDir = getFullPath(targetConfig.getBasePath(), relativePath); + try { + // 如果是本地文件系统,使用Java NIO + if (isLocalServer(sourceConfig)) { + syncLocalDirectory(relativePath, sourceClient, targetClient); + } else { + // 远程服务器同步逻辑(简化实现) + syncRemoteDirectory(sourceClient, targetClient, sourceConfig.getBasePath(),targetConfig.getBasePath()); + } + + } catch (Exception e) { + logger.error("Error syncing directory: {}", relativePath, e); + } + } + + private void syncLocalDirectory(String relativePath, SftpClient sourceClient, SftpClient targetClient) + throws IOException { + + Path sourcePath = Paths.get(sourceConfig.getBasePath(), relativePath); + + Files.walkFileTree(sourcePath, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (Files.isRegularFile(file)) { + + String relativeFile = sourcePath.relativize(file).toString(); + // 检查文件路径是否包含ipList中的任意IP + boolean shouldSync = ipList.stream().anyMatch(ip -> { + // 检查相对路径是否以IP地址开头 + return relativeFile.startsWith(ip + File.separator) || + relativeFile.startsWith(ip + "/") || + relativeFile.startsWith(ip + "\\"); + }); + if (shouldSync){ + syncFileIfNeeded(relativeFile, sourceClient, targetClient); + } + } + return FileVisitResult.CONTINUE; + } + }); + + + + + } + private void syncRemoteDirectory(SftpClient sourceClient, SftpClient destClient, String sourceDir, String destDir) throws SftpException { + ChannelSftp destSftp = destClient.getChannel(); + ChannelSftp sourceSftp = sourceClient.getChannel(); + // 获取源目录下的文件列表 + Vector files = sourceSftp.ls(sourceDir); + + for (ChannelSftp.LsEntry entry : files) { + String filename = entry.getFilename(); + if (".".equals(filename) || "..".equals(filename)) { + continue; + } + + String sourceFilePath = sourceDir + "/" + filename; + String destFilePath = destDir + "/" + filename; + boolean shouldSync = ipList.stream().anyMatch(ip -> { + // 检查路径是否包含设备ip + return sourceFilePath.contains(ip ) ; + + }); + if(shouldSync){ + if (entry.getAttrs().isDir()) { + // 如果是目录,建目录递归同步 + + destClient.createRemoteDirectory(destFilePath); + syncRemoteDirectory(sourceClient, destClient, sourceFilePath, destFilePath); + } else { + // 如果是文件,判断是否需要同步 + if (needSync(sourceSftp, destSftp, sourceFilePath, destFilePath, entry)) { + + // 传输文件 + try (InputStream inputStream = sourceSftp.get(sourceFilePath)) { + destSftp.put(inputStream, destFilePath); + System.out.println("Synced: " + sourceFilePath); + } catch (Exception e) { + e.printStackTrace(); + } + + } else { + System.out.println("Skipped: " + sourceFilePath); + } + } + } + + } + + } + + private boolean needSync(ChannelSftp sourceSftp, ChannelSftp destSftp, String sourcePath, String destPath, ChannelSftp.LsEntry sourceEntry) throws SftpException { + // 判断目标文件是否存在 + SftpATTRS destAttrs; + try { + destAttrs = destSftp.lstat(destPath); + } catch (SftpException e) { + // 如果目标文件不存在,肯定需要同步 + return true; + } + + // 比较文件的修改时间 + long sourceMTime = sourceEntry.getAttrs().getMTime(); + long destMTime = destAttrs.getMTime(); + + // 如果源文件的修改时间晚于目标文件,则需要同步 + if (sourceMTime > destMTime) { + return true; + } + + // 你也可以选择性地比较文件大小 + // return sourceEntry.getAttrs().getSize() != destAttrs.getSize(); + + return false; + } + + private void syncFileIfNeeded( String relativeFile, SftpClient sourceClient, SftpClient targetClient) { + try { + String sourceFullPath = getFullPath(sourceConfig.getBasePath(), relativeFile); + FileSyncState.FileMetadata currentMetadata = syncState.getFileMetadata(sourceFullPath); + + if (syncState.isFileChanged(relativeFile, currentMetadata)) { + logger.info("Syncing changed file: {}", relativeFile); + + // 上传文件 + targetClient.uploadFile(sourceFullPath, + getFullPath(targetConfig.getBasePath(), + new File(relativeFile).getParent() != null ? + new File(relativeFile).getParent() : "")); + + // 更新同步状态 + syncState.updateFileState(relativeFile, currentMetadata); + } else { + logger.debug("File unchanged, skipping: {}", relativeFile); + } + + } catch (Exception e) { + logger.error("Error syncing file: {}", relativeFile, e); + } + } + + private String getFullPath(String basePath, String relativePath) { + if (relativePath == null || relativePath.isEmpty()) { + return basePath; + } + + if (basePath.endsWith("/") || basePath.endsWith("\\")) { + return basePath + relativePath; + } else { + if(basePath.contains("/")){ + return basePath + "/" + relativePath; + }else if(basePath.contains("\\")){ + return basePath + "\\" + relativePath; + } + + } + return basePath; + } + + private boolean isLocalServer(ServerConfig config) { + // 简单的本地服务器检测逻辑 + return "localhost".equals(config.getHost()) || + "127.0.0.1".equals(config.getHost()); + } +} diff --git a/filesync/src/main/java/com/njcn/filesync/sftp/SftpClient.java b/filesync/src/main/java/com/njcn/filesync/sftp/SftpClient.java new file mode 100644 index 0000000..ba9eecd --- /dev/null +++ b/filesync/src/main/java/com/njcn/filesync/sftp/SftpClient.java @@ -0,0 +1,110 @@ +package com.njcn.filesync.sftp; + +import com.jcraft.jsch.*; +import com.njcn.filesync.config.ServerConfig; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.io.File; +import java.util.Vector; + +/** + * Description: + * Date: 2025/10/16 下午 3:00【需求编号】 + *SFTP客户端实现 + * @author clam + * @version V1.0.0 + */ +@Service +@Data +public class SftpClient implements AutoCloseable{ + private static final Logger logger = LoggerFactory.getLogger(SftpClient.class); + + private JSch jsch; + private Session session; + private ChannelSftp channel; + + public void connect(ServerConfig config) throws JSchException { + jsch = new JSch(); + + // 设置私钥(如果提供) + if (config.getPrivateKeyPath() != null && !config.getPrivateKeyPath().isEmpty()) { + jsch.addIdentity(config.getPrivateKeyPath()); + } + + session = jsch.getSession(config.getUsername(), config.getHost(), config.getPort()); + + // 设置密码(如果提供) + if (config.getPassword() != null && !config.getPassword().isEmpty()) { + session.setPassword(config.getPassword()); + } + + // 配置SSH连接 + java.util.Properties sshConfig = new java.util.Properties(); + sshConfig.put("StrictHostKeyChecking", "no"); + session.setConfig(sshConfig); + + session.connect(30000); // 30秒超时 + + // 打开SFTP通道 + channel = (ChannelSftp) session.openChannel("sftp"); + channel.connect(30000); + } + public void uploadFile(String localFile, String remotePath) throws SftpException { + File file = new File(localFile); + String remoteFile = ensureRemotePath(remotePath, file.getName()); + + // 确保远程目录存在 + createRemoteDirectory(remotePath); + + // 上传文件 + channel.put(localFile, remoteFile); + } + + public ChannelSftp.LsEntry getRemoteFileInfo(String remotePath) throws SftpException { + @SuppressWarnings("unchecked") + Vector files = channel.ls(remotePath); + if (files != null && !files.isEmpty()) { + return files.get(0); + } + return null; + } + + private String ensureRemotePath(String remotePath, String fileName) { + if (remotePath.endsWith("/")) { + return remotePath + fileName; + } else { + return remotePath + "/" + fileName; + } + } + + void createRemoteDirectory(String remotePath) throws SftpException { + + // 处理Windows路径分隔符 + String normalizedPath = remotePath.replace("\\", "/"); + String[] directories = normalizedPath.split("/"); + StringBuilder currentPath = new StringBuilder(); + + try { + channel.mkdir(normalizedPath); + logger.debug("Created remote directory: {}", currentPath); + } catch (SftpException e) { + if (e.id != ChannelSftp.SSH_FX_FAILURE) { + // 目录可能已存在,忽略这个错误 + logger.debug("Directory may already exist: {}", currentPath); + } + } + } + + @Override + public void close() { + if (channel != null) { + channel.disconnect(); + } + if (session != null) { + session.disconnect(); + } + } +} diff --git a/filesync/src/main/java/com/njcn/job/FileSyncJob.java b/filesync/src/main/java/com/njcn/job/FileSyncJob.java new file mode 100644 index 0000000..cdac6dd --- /dev/null +++ b/filesync/src/main/java/com/njcn/job/FileSyncJob.java @@ -0,0 +1,47 @@ +package com.njcn.job; + +import com.njcn.device.pojo.po.PqDevice; +import com.njcn.device.service.PqDeviceService; +import com.njcn.filesync.config.*; +import com.njcn.filesync.service.FileSyncService; +import com.njcn.filesync.sftp.IncrementalFileSync; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * @Author: cdf + * @CreateTime: 2025-09-19 + * @Description: 定时任务 + */ +@Component +@EnableScheduling +@RequiredArgsConstructor +@Slf4j +public class FileSyncJob { + + private final SourceConfig sourceConfig; + private final TargetConfig targetConfig; + private final PqDeviceService pqDeviceService; + + private final FileSyncService fileSyncService; + // 每10分钟执行 + @Scheduled(cron = "0 0/10 * * * ? ") + public void UpHarmonicJob(){ + + fileSyncService.triggerCrossSync(); + + + } + + +} diff --git a/filesync/src/main/resources/application-wuxi_dev.yml b/filesync/src/main/resources/application-wuxi_dev.yml new file mode 100644 index 0000000..0643017 --- /dev/null +++ b/filesync/src/main/resources/application-wuxi_dev.yml @@ -0,0 +1,104 @@ +#当前服务的基本信息 +spring: + datasource: + druid: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://192.168.1.24:13306/pqsinfo_wuxi?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=Asia/Shanghai + username: root + password: njcnpqs + # url: jdbc:mysql://localhost:3306/pqs91001?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT + # username: root + # password: root + #初始化建立物理连接的个数、最小、最大连接数 + initial-size: 5 + min-idle: 5 + max-active: 50 + #获取连接最大等待时间,单位毫秒 + max-wait: 60000 + #链接保持空间而不被驱逐的最长时间,单位毫秒 + min-evictable-idle-time-millis: 300000 + validation-query: select 1 + test-while-idle: true + test-on-borrow: false + test-on-return: false + pool-prepared-statements: true + max-pool-prepared-statement-per-connection-size: 20 + + + servlet: + multipart: + max-file-size: 100MB + max-request-size: 100MB + #influxDB内容配置 + influx: + url: http://192.168.1.103:18086 + user: admin + password: 123456 + database: pqsbase_wx + mapper-location: com.njcn.**.imapper +#mybatis配置信息 +mybatis-plus: + mapper-locations: classpath*:com/njcn/**/mapping/*.xml + #别名扫描 + type-aliases-package: com.njcn.product.**.pojo + configuration: + #驼峰命名 + map-underscore-to-camel-case: true + #配置sql日志输出 + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + # #关闭日志输出 + # log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl + global-config: + db-config: + #指定主键生成策略 + id-type: assign_uuid +db: + type: mysql +#文件位置配置 +business: + #处理波形数据位置 + wavePath: D://comtrade + #wavePath: /usr/local/comtrade + #处理临时数据 + #tempPath: D://file + tempPath: /usr/local/file + #文件存储的方式 1.本地 + file: + storage: 1 + #localStoragePath: /usr/local/localStoragePath + localStoragePath: D://localStoragePath +#oss服务器配置 +min: + io: + endpoint: http://192.168.1.22:9009 + accessKey: minio + secretKey: minio@123 + bucket: excelreport + #华为obs服务器配置 +huawei: + access-key: J9GS9EA79PZ60OK23LWP + security-key: BirGrAFDSLxU8ow5fffyXgZRAmMRb1R1AdqCI60d + obs: + bucket: test-8601 + endpoint: https://obs.cn-east-3.myhuaweicloud.com + # 单位为秒 + expire: 3600 + +server: + source: + host: 192.168.1.63 + port: 22 + username: root + password: dnzl@#001 + basePath: / + privateKeyPath: + target: + host: 192.168.1.103 + port: 22 + username: root + password: dnzl@#001 + basePath: /usr/local/wuxi + privateKeyPath: + + + diff --git a/filesync/src/main/resources/application.yml b/filesync/src/main/resources/application.yml new file mode 100644 index 0000000..5ca7e06 --- /dev/null +++ b/filesync/src/main/resources/application.yml @@ -0,0 +1,84 @@ +#当前服务的基本信息 +microservice: + ename: file-sync + name: file-sync +server: + port: 19002 +spring: + application: + name: file-sync + + profiles: + active: wuxi_dev + + + jackson: + time-zone: GMT+8 + date-format: yyyy-MM-dd HH:mm:ss + locale: zh_CN + serialization: + # 格式化输出 + indent_output: false + servlet: + multipart: + max-file-size: 100MB + max-request-size: 100MB + +#mybatis配置信息 +mybatis-plus: + mapper-locations: classpath*:com/njcn/**/mapping/*.xml + #别名扫描 + type-aliases-package: com.njcn.filesync.**.pojo + configuration: + #驼峰命名 + map-underscore-to-camel-case: true + #配置sql日志输出 + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + # #关闭日志输出 + # log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl + global-config: + db-config: + #指定主键生成策略 + id-type: assign_uuid +db: + type: mysql +#文件位置配置 +business: + #处理波形数据位置 + wavePath: D://comtrade + #wavePath: /usr/local/comtrade + #处理临时数据 + #tempPath: D://file + tempPath: /usr/local/file + #文件存储的方式 1.本地 + file: + storage: 1 + #localStoragePath: /usr/local/localStoragePath + localStoragePath: f://localStoragePath +#oss服务器配置 +min: + io: + endpoint: http://192.168.1.22:9009 + accessKey: minio + secretKey: minio@123 + bucket: excelreport + #华为obs服务器配置 +huawei: + access-key: J9GS9EA79PZ60OK23LWP + security-key: BirGrAFDSLxU8ow5fffyXgZRAmMRb1R1AdqCI60d + obs: + bucket: test-8601 + endpoint: https://obs.cn-east-3.myhuaweicloud.com + # 单位为秒 + expire: 3600 + +#线程池配置信息 +threadPool: + corePoolSize: 10 + maxPoolSize: 20 + queueCapacity: 500 + keepAliveSeconds: 60 +file: + upload-dir: D:/carry + + diff --git a/filesync/src/test/resources/.gitkeep b/filesync/src/test/resources/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/pom.xml b/pom.xml index 7c57619..cb5ed70 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,8 @@ mysql-data influx-data manufact_influx_data + filesync + pom 灿能数据迁移系统