暂态文件同步功能开发

This commit is contained in:
hzj
2025-11-03 16:30:58 +08:00
parent 8900f41486
commit b9fb336cac
21 changed files with 1398 additions and 0 deletions

38
filesync/.gitignore vendored Normal file
View File

@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

92
filesync/pom.xml Normal file
View File

@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.njcn</groupId>
<artifactId>data-migration</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>filesync</artifactId>
<dependencies>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>njcn-common</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>mybatis-plus</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>spingboot2.3.12</artifactId>
<version>2.3.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
<!-- SSH/SFTP 客户端 -->
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
<dependency>
<groupId>com.njcn</groupId>
<artifactId>mybatis-plus</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<finalName>file-sync</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@@ -0,0 +1,15 @@
package com.njcn;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@Slf4j
@MapperScan("com.njcn.**.mapper")
@SpringBootApplication(scanBasePackages = "com.njcn")
public class FileSyncApplication {
public static void main(String[] args) {
SpringApplication.run(FileSyncApplication.class, args);
}
}

View File

@@ -0,0 +1,21 @@
package com.njcn.device.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.njcn.device.pojo.po.PqDevice;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
*
* Description:
* Date: 2025/06/19 下午 1:47【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface PqDeviceMapper extends BaseMapper<PqDevice> {
}

View File

@@ -0,0 +1,167 @@
package com.njcn.device.pojo.po;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
*
* Description:
* Date: 2025/06/19 下午 1:47【需求编号】
*
* @author clam
* @version V1.0.0
*/
/**
* 靠靠?
*/
@Data
@TableName("pq_device")
public class PqDevice implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 装置序号
*/
@TableId
private String id;
/**
* 装置模型0虚拟设备1实际设备2离线设备默认是实际设备
*/
private Integer devModel;
/**
* 数据类型0暂态系统1稳态系统2两个系统
*/
private Integer devDataType;
/**
* 终端运行状态0运行1检修2停运3调试4退运
*/
private Integer runFlag;
/**
* 通讯状态0中断1正常
*/
private Integer comFlag;
/**
* 设备制造商,字典表
*/
private String manufacturer;
/**
* 定检状态0未检 1已检
*/
private Integer checkFlag;
/**
* 前置类型MMS、CLD字典表
*/
private String frontType;
/**
* 终端型号570、580……字典表
*/
private String devType;
/**
* 网络参数
*/
private String ip;
/**
* 召唤标志0周期触发1变为触发
*/
private Integer callFlag;
/**
* 端口
*/
private Integer port;
/**
* 装置识别码3ds加密
*/
private String series;
/**
* 装置秘钥3ds加密
*/
private String devKey;
/**
* 前置序号Id,前置表
*/
private String nodeId;
/**
* 投运时间
*/
private LocalDate loginTime;
/**
* 数据更新时间
*/
private LocalDateTime updateTime;
/**
* 本次定检时间,默认等于投运时间
*/
private LocalDate thisTimeCheck;
/**
* 下次定检时间,默认为投运时间后推3年假如时间小于3个月则为待检
*/
private LocalDate nextTimeCheck;
/**
* 电度功能 0 关闭 1开启
*/
private Integer electroplate;
/**
* 对时功能 0 关闭, 1开启
*/
private Integer onTime;
/**
* 合同号
*/
private String contract;
/**
* 设备sim卡号
*/
private String sim;
/**
* 装置系列
*/
private String devSeries;
/**
* 监测装置安装位置
*/
private String devLocation;
/**
* 监测厂家设备编号
*/
private String devNo;
/**
* 告警功能 0:关闭 null、1:开启
*/
private Integer isAlarm;
}

View File

@@ -0,0 +1,22 @@
package com.njcn.device.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.njcn.device.pojo.po.PqDevice;
import java.util.List;
/**
*
* Description:
* Date: 2025/06/19 下午 1:47【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface PqDeviceService extends IService<PqDevice> {
}

View File

@@ -0,0 +1,25 @@
package com.njcn.device.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.njcn.device.mapper.PqDeviceMapper;
import com.njcn.device.pojo.po.PqDevice;
import com.njcn.device.service.PqDeviceService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
*
* Description:
* Date: 2025/06/19 下午 1:47【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
public class PqDeviceServiceImpl extends ServiceImpl<PqDeviceMapper, PqDevice> implements PqDeviceService {
}

View File

@@ -0,0 +1,125 @@
package com.njcn.filesync.config;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
/**
* Description:
* Date: 2025/10/16 下午 2:58【需求编号】文件同步状态管理
*
* @author clam
* @version V1.0.0
*/
public class FileSyncState {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String stateFilePath;
private Map<String, FileMetadata> fileStates;
public static class FileMetadata implements Serializable {
private long size;
private long lastModified;
private String checksum;
public FileMetadata() {}
public FileMetadata(long size, long lastModified, String checksum) {
this.size = size;
this.lastModified = lastModified;
this.checksum = checksum;
}
// Getters and Setters
public long getSize() { return size; }
public void setSize(long size) { this.size = size; }
public long getLastModified() { return lastModified; }
public void setLastModified(long lastModified) { this.lastModified = lastModified; }
public String getChecksum() { return checksum; }
public void setChecksum(String checksum) { this.checksum = checksum; }
}
public FileSyncState(String stateFilePath) {
this.stateFilePath = stateFilePath;
this.fileStates = new HashMap<>();
loadState();
}
@SuppressWarnings("unchecked")
private void loadState() {
File stateFile = new File(stateFilePath);
if (stateFile.exists()) {
try {
fileStates =objectMapper.readValue(stateFile, new TypeReference<Map<String, FileMetadata>>() {});
} catch (IOException e) {
System.err.println("Failed to load sync state: " + e.getMessage());
fileStates = new HashMap<>();
}
}
}
public void saveState() {
try {
objectMapper.writerWithDefaultPrettyPrinter()
.writeValue(new File(stateFilePath), fileStates);
} catch (IOException e) {
System.err.println("Failed to save sync state: " + e.getMessage());
}
}
public boolean isFileChanged(String filePath, FileMetadata currentMetadata) {
FileMetadata previousMetadata = fileStates.get(filePath);
if (previousMetadata == null) {
return true; // 新文件
}
// 检查文件大小和修改时间
if (previousMetadata.getSize() != currentMetadata.getSize() ||
previousMetadata.getLastModified() != currentMetadata.getLastModified()) {
return true;
}
// 如果时间相同但需要更精确的比较,可以检查校验和
if (currentMetadata.getChecksum() != null &&
!currentMetadata.getChecksum().equals(previousMetadata.getChecksum())) {
return true;
}
return false;
}
public void updateFileState(String filePath, FileMetadata metadata) {
fileStates.put(filePath, metadata);
}
public FileMetadata getFileMetadata(String filePath) throws IOException {
Path path = Paths.get(filePath);
BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);
// 计算简单的校验和(对于大文件,可以只计算部分校验和)
String checksum = calculateSimpleChecksum(path.toFile());
return new FileMetadata(
attrs.size(),
attrs.lastModifiedTime().toMillis(),
checksum
);
}
private String calculateSimpleChecksum(File file) {
// 简化的校验和计算:文件大小 + 最后修改时间
// 对于生产环境建议使用MD5或SHA256
return file.length() + "-" + file.lastModified();
}
}

View File

@@ -0,0 +1,34 @@
package com.njcn.filesync.config;
import lombok.Data;
/**
* Description:
* Date: 2025/10/16 下午 2:57【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
public class ServerConfig {
private String host;
private int port = 22;
private String username;
private String password;
private String privateKeyPath;
private String basePath;
private String serverType; // "windows" 或 "linux"
public boolean isWindows() {
return "windows".equalsIgnoreCase(serverType);
}
public boolean usePasswordAuth() {
return password != null && !password.trim().isEmpty();
}
public boolean useKeyAuth() {
return privateKeyPath != null && !privateKeyPath.trim().isEmpty();
}
}

View File

@@ -0,0 +1,35 @@
package com.njcn.filesync.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
/**
* Description:
* Date: 2025/10/16 下午 2:57【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
@Configuration
@Order(10)
public class SourceConfig {
@Value("${server.source.host}")
private String host;
@Value("${server.source.port}")
private int port = 22;
@Value("${server.source.username}")
private String username;
@Value("${server.source.password}")
private String password;
@Value("${server.source.privateKeyPath}")
private String privateKeyPath;
@Value("${server.source.basePath}")
private String basePath;
}

View File

@@ -0,0 +1,35 @@
package com.njcn.filesync.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
/**
* Description:
* Date: 2025/10/16 下午 2:57【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Data
@Configuration
@Order(10)
public class TargetConfig {
@Value("${server.target.host}")
private String host;
@Value("${server.target.port}")
private int port;
@Value("${server.target.username}")
private String username;
@Value("${server.target.password}")
private String password;
@Value("${server.target.privateKeyPath}")
private String privateKeyPath;
@Value("${server.target.basePath}")
private String basePath;
}

View File

@@ -0,0 +1,51 @@
package com.njcn.filesync.controller;
import com.njcn.common.pojo.annotation.OperateInfo;
import com.njcn.common.pojo.enums.common.LogEnum;
import com.njcn.common.pojo.enums.response.CommonResponseEnum;
import com.njcn.common.pojo.response.HttpResult;
import com.njcn.filesync.service.FileSyncService;
import com.njcn.web.controller.BaseController;
import com.njcn.web.utils.HttpResultUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* Description:
* Date: 2025/10/20 下午 2:55【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Slf4j
@RestController
@RequestMapping("/filesync")
@RequiredArgsConstructor
@Api(tags = "文件同步")
public class FileSyncController extends BaseController {
private final FileSyncService fileSyncService;
/**
* 手动触发跨服务器同步
*/
@PostMapping("/trigger")
@OperateInfo(info = LogEnum.SYSTEM_COMMON)
@ApiOperation("暂态文件同步手动触发")
public HttpResult<Boolean> triggerCrossSync() {
String methodDescribe = getMethodDescribe("triggerCrossSync");
boolean flag =fileSyncService.triggerCrossSync();
return HttpResultUtil.assembleCommonResponseResult(CommonResponseEnum.SUCCESS,flag , methodDescribe);
}
}

View File

@@ -0,0 +1,12 @@
package com.njcn.filesync.service;
/**
* Description:
* Date: 2025/10/20 下午 3:02【需求编号】
*
* @author clam
* @version V1.0.0
*/
public interface FileSyncService {
boolean triggerCrossSync();
}

View File

@@ -0,0 +1,50 @@
package com.njcn.filesync.service.impl;
import com.njcn.device.pojo.po.PqDevice;
import com.njcn.device.service.PqDeviceService;
import com.njcn.filesync.config.ServerConfig;
import com.njcn.filesync.config.SourceConfig;
import com.njcn.filesync.config.TargetConfig;
import com.njcn.filesync.service.FileSyncService;
import com.njcn.filesync.sftp.IncrementalFileSync;
import jdk.nashorn.internal.ir.annotations.Reference;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.stream.Collectors;
/**
* Description:
* Date: 2025/10/20 下午 3:03【需求编号】
*
* @author clam
* @version V1.0.0
*/
@Service
@RequiredArgsConstructor
public class FileSyncServiceImpl implements FileSyncService {
private final SourceConfig sourceConfig;
private final TargetConfig targetConfig;
private final PqDeviceService pqDeviceService;
@Override
public boolean triggerCrossSync() {
ServerConfig source = new ServerConfig();
ServerConfig target = new ServerConfig();
BeanUtils.copyProperties(sourceConfig,source);
BeanUtils.copyProperties(targetConfig,target);
List<PqDevice> list = pqDeviceService.lambdaQuery().list();
if(!CollectionUtils.isEmpty(list)){
List<String> ipList = list.stream().map(PqDevice::getIp).collect(Collectors.toList());
IncrementalFileSync fileSync = new IncrementalFileSync(
source, target, "sync-state.json",ipList);
// 同步文件
fileSync.syncOnce();
}
return true;
}
}

View File

@@ -0,0 +1,329 @@
package com.njcn.filesync.sftp;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import com.njcn.filesync.config.FileSyncState;
import com.njcn.filesync.config.ServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.Vector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Description:
* Date: 2025/10/16 下午 3:02【需求编号】
*
* @author clam
* @version V1.0.0
*/
public class IncrementalFileSync {
private static final Logger logger = LoggerFactory.getLogger(IncrementalFileSync.class);
private ServerConfig sourceConfig;
private ServerConfig targetConfig;
private FileSyncState syncState;
// private ScheduledExecutorService scheduler;
private WatchService watchService;
private List<String> ipList;
private boolean isMonitoring = false;
public IncrementalFileSync(ServerConfig sourceConfig, ServerConfig targetConfig, String stateFilePath,List<String> ipList) {
this.sourceConfig = sourceConfig;
this.targetConfig = targetConfig;
this.syncState = new FileSyncState(stateFilePath);
// this.scheduler = Executors.newScheduledThreadPool(2);
this.ipList = ipList;
}
/**
* 执行一次增量同步
*/
public void syncOnce() {
logger.info("Starting incremental synchronization...");
try (SftpClient sourceClient = new SftpClient();
SftpClient targetClient = new SftpClient()) {
// 连接到源服务器和目标服务器
sourceClient.connect(sourceConfig);
targetClient.connect(targetConfig);
// 同步文件
syncDirectory("", sourceClient, targetClient);
logger.info("Incremental synchronization completed successfully");
} catch (Exception e) {
logger.error("Synchronization failed", e);
} finally {
syncState.saveState();
}
}
/**
* 开始定时同步
*/
// public void startScheduledSync(long interval, TimeUnit timeUnit) {
// logger.info("Starting scheduled synchronization every {} {}", interval, timeUnit);
//
// scheduler.scheduleAtFixedRate(() -> {
// try {
// syncOnce();
// } catch (Exception e) {
// logger.error("Scheduled synchronization failed", e);
// }
// }, 0, interval, timeUnit);
// }
/**
* 开始文件监控同步(仅当源服务器是本地文件系统时)
*/
// public void startFileWatchSync() throws IOException {
// if (!isLocalServer(sourceConfig)) {
// throw new IllegalStateException("File watching is only supported for local source servers");
// }
//
// watchService = FileSystems.getDefault().newWatchService();
// Path sourcePath = Paths.get(sourceConfig.getBasePath());
//
// // 注册目录监听
// sourcePath.register(watchService,
// StandardWatchEventKinds.ENTRY_CREATE,
// StandardWatchEventKinds.ENTRY_MODIFY,
// StandardWatchEventKinds.ENTRY_DELETE);
//
// isMonitoring = true;
// logger.info("Starting file watch synchronization for: {}", sourcePath);
//
// scheduler.execute(() -> {
// while (isMonitoring) {
// try {
// WatchKey key = watchService.take();
//
// for (WatchEvent<?> event : key.pollEvents()) {
// WatchEvent.Kind<?> kind = event.kind();
//
// if (kind == StandardWatchEventKinds.OVERFLOW) {
// continue;
// }
//
// @SuppressWarnings("unchecked")
// WatchEvent<Path> ev = (WatchEvent<Path>) event;
// Path fileName = ev.context();
// Path fullPath = sourcePath.resolve(fileName);
//
// logger.info("File change detected: {} - {}", kind.name(), fullPath);
//
// // 延迟一下,确保文件操作完成
// Thread.sleep(1000);
// syncOnce();
// }
//
// key.reset();
//
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// break;
// } catch (Exception e) {
// logger.error("File watch error", e);
// }
// }
// });
// }
// /**
// * 停止所有同步任务
// */
// public void stop() {
// isMonitoring = false;
// scheduler.shutdown();
//
// if (watchService != null) {
// try {
// watchService.close();
// } catch (IOException e) {
// logger.error("Error closing watch service", e);
// }
// }
//
// logger.info("File synchronization stopped");
// }
private void syncDirectory(String relativePath, SftpClient sourceClient, SftpClient targetClient) {
// String sourceDir = getFullPath(sourceConfig.getBasePath(), relativePath);
// String targetDir = getFullPath(targetConfig.getBasePath(), relativePath);
try {
// 如果是本地文件系统使用Java NIO
if (isLocalServer(sourceConfig)) {
syncLocalDirectory(relativePath, sourceClient, targetClient);
} else {
// 远程服务器同步逻辑(简化实现)
syncRemoteDirectory(sourceClient, targetClient, sourceConfig.getBasePath(),targetConfig.getBasePath());
}
} catch (Exception e) {
logger.error("Error syncing directory: {}", relativePath, e);
}
}
private void syncLocalDirectory(String relativePath, SftpClient sourceClient, SftpClient targetClient)
throws IOException {
Path sourcePath = Paths.get(sourceConfig.getBasePath(), relativePath);
Files.walkFileTree(sourcePath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file)) {
String relativeFile = sourcePath.relativize(file).toString();
// 检查文件路径是否包含ipList中的任意IP
boolean shouldSync = ipList.stream().anyMatch(ip -> {
// 检查相对路径是否以IP地址开头
return relativeFile.startsWith(ip + File.separator) ||
relativeFile.startsWith(ip + "/") ||
relativeFile.startsWith(ip + "\\");
});
if (shouldSync){
syncFileIfNeeded(relativeFile, sourceClient, targetClient);
}
}
return FileVisitResult.CONTINUE;
}
});
}
private void syncRemoteDirectory(SftpClient sourceClient, SftpClient destClient, String sourceDir, String destDir) throws SftpException {
ChannelSftp destSftp = destClient.getChannel();
ChannelSftp sourceSftp = sourceClient.getChannel();
// 获取源目录下的文件列表
Vector<ChannelSftp.LsEntry> files = sourceSftp.ls(sourceDir);
for (ChannelSftp.LsEntry entry : files) {
String filename = entry.getFilename();
if (".".equals(filename) || "..".equals(filename)) {
continue;
}
String sourceFilePath = sourceDir + "/" + filename;
String destFilePath = destDir + "/" + filename;
boolean shouldSync = ipList.stream().anyMatch(ip -> {
// 检查路径是否包含设备ip
return sourceFilePath.contains(ip ) ;
});
if(shouldSync){
if (entry.getAttrs().isDir()) {
// 如果是目录,建目录递归同步
destClient.createRemoteDirectory(destFilePath);
syncRemoteDirectory(sourceClient, destClient, sourceFilePath, destFilePath);
} else {
// 如果是文件,判断是否需要同步
if (needSync(sourceSftp, destSftp, sourceFilePath, destFilePath, entry)) {
// 传输文件
try (InputStream inputStream = sourceSftp.get(sourceFilePath)) {
destSftp.put(inputStream, destFilePath);
System.out.println("Synced: " + sourceFilePath);
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("Skipped: " + sourceFilePath);
}
}
}
}
}
private boolean needSync(ChannelSftp sourceSftp, ChannelSftp destSftp, String sourcePath, String destPath, ChannelSftp.LsEntry sourceEntry) throws SftpException {
// 判断目标文件是否存在
SftpATTRS destAttrs;
try {
destAttrs = destSftp.lstat(destPath);
} catch (SftpException e) {
// 如果目标文件不存在,肯定需要同步
return true;
}
// 比较文件的修改时间
long sourceMTime = sourceEntry.getAttrs().getMTime();
long destMTime = destAttrs.getMTime();
// 如果源文件的修改时间晚于目标文件,则需要同步
if (sourceMTime > destMTime) {
return true;
}
// 你也可以选择性地比较文件大小
// return sourceEntry.getAttrs().getSize() != destAttrs.getSize();
return false;
}
private void syncFileIfNeeded( String relativeFile, SftpClient sourceClient, SftpClient targetClient) {
try {
String sourceFullPath = getFullPath(sourceConfig.getBasePath(), relativeFile);
FileSyncState.FileMetadata currentMetadata = syncState.getFileMetadata(sourceFullPath);
if (syncState.isFileChanged(relativeFile, currentMetadata)) {
logger.info("Syncing changed file: {}", relativeFile);
// 上传文件
targetClient.uploadFile(sourceFullPath,
getFullPath(targetConfig.getBasePath(),
new File(relativeFile).getParent() != null ?
new File(relativeFile).getParent() : ""));
// 更新同步状态
syncState.updateFileState(relativeFile, currentMetadata);
} else {
logger.debug("File unchanged, skipping: {}", relativeFile);
}
} catch (Exception e) {
logger.error("Error syncing file: {}", relativeFile, e);
}
}
private String getFullPath(String basePath, String relativePath) {
if (relativePath == null || relativePath.isEmpty()) {
return basePath;
}
if (basePath.endsWith("/") || basePath.endsWith("\\")) {
return basePath + relativePath;
} else {
if(basePath.contains("/")){
return basePath + "/" + relativePath;
}else if(basePath.contains("\\")){
return basePath + "\\" + relativePath;
}
}
return basePath;
}
private boolean isLocalServer(ServerConfig config) {
// 简单的本地服务器检测逻辑
return "localhost".equals(config.getHost()) ||
"127.0.0.1".equals(config.getHost());
}
}

View File

@@ -0,0 +1,110 @@
package com.njcn.filesync.sftp;
import com.jcraft.jsch.*;
import com.njcn.filesync.config.ServerConfig;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.File;
import java.util.Vector;
/**
* Description:
* Date: 2025/10/16 下午 3:00【需求编号】
*SFTP客户端实现
* @author clam
* @version V1.0.0
*/
@Service
@Data
public class SftpClient implements AutoCloseable{
private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
private JSch jsch;
private Session session;
private ChannelSftp channel;
public void connect(ServerConfig config) throws JSchException {
jsch = new JSch();
// 设置私钥(如果提供)
if (config.getPrivateKeyPath() != null && !config.getPrivateKeyPath().isEmpty()) {
jsch.addIdentity(config.getPrivateKeyPath());
}
session = jsch.getSession(config.getUsername(), config.getHost(), config.getPort());
// 设置密码(如果提供)
if (config.getPassword() != null && !config.getPassword().isEmpty()) {
session.setPassword(config.getPassword());
}
// 配置SSH连接
java.util.Properties sshConfig = new java.util.Properties();
sshConfig.put("StrictHostKeyChecking", "no");
session.setConfig(sshConfig);
session.connect(30000); // 30秒超时
// 打开SFTP通道
channel = (ChannelSftp) session.openChannel("sftp");
channel.connect(30000);
}
public void uploadFile(String localFile, String remotePath) throws SftpException {
File file = new File(localFile);
String remoteFile = ensureRemotePath(remotePath, file.getName());
// 确保远程目录存在
createRemoteDirectory(remotePath);
// 上传文件
channel.put(localFile, remoteFile);
}
public ChannelSftp.LsEntry getRemoteFileInfo(String remotePath) throws SftpException {
@SuppressWarnings("unchecked")
Vector<ChannelSftp.LsEntry> files = channel.ls(remotePath);
if (files != null && !files.isEmpty()) {
return files.get(0);
}
return null;
}
private String ensureRemotePath(String remotePath, String fileName) {
if (remotePath.endsWith("/")) {
return remotePath + fileName;
} else {
return remotePath + "/" + fileName;
}
}
void createRemoteDirectory(String remotePath) throws SftpException {
// 处理Windows路径分隔符
String normalizedPath = remotePath.replace("\\", "/");
String[] directories = normalizedPath.split("/");
StringBuilder currentPath = new StringBuilder();
try {
channel.mkdir(normalizedPath);
logger.debug("Created remote directory: {}", currentPath);
} catch (SftpException e) {
if (e.id != ChannelSftp.SSH_FX_FAILURE) {
// 目录可能已存在,忽略这个错误
logger.debug("Directory may already exist: {}", currentPath);
}
}
}
@Override
public void close() {
if (channel != null) {
channel.disconnect();
}
if (session != null) {
session.disconnect();
}
}
}

View File

@@ -0,0 +1,47 @@
package com.njcn.job;
import com.njcn.device.pojo.po.PqDevice;
import com.njcn.device.service.PqDeviceService;
import com.njcn.filesync.config.*;
import com.njcn.filesync.service.FileSyncService;
import com.njcn.filesync.sftp.IncrementalFileSync;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @Author: cdf
* @CreateTime: 2025-09-19
* @Description: 定时任务
*/
@Component
@EnableScheduling
@RequiredArgsConstructor
@Slf4j
public class FileSyncJob {
private final SourceConfig sourceConfig;
private final TargetConfig targetConfig;
private final PqDeviceService pqDeviceService;
private final FileSyncService fileSyncService;
// 每10分钟执行
@Scheduled(cron = "0 0/10 * * * ? ")
public void UpHarmonicJob(){
fileSyncService.triggerCrossSync();
}
}

View File

@@ -0,0 +1,104 @@
#当前服务的基本信息
spring:
datasource:
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.1.24:13306/pqsinfo_wuxi?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=Asia/Shanghai
username: root
password: njcnpqs
# url: jdbc:mysql://localhost:3306/pqs91001?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
# username: root
# password: root
#初始化建立物理连接的个数、最小、最大连接数
initial-size: 5
min-idle: 5
max-active: 50
#获取连接最大等待时间,单位毫秒
max-wait: 60000
#链接保持空间而不被驱逐的最长时间,单位毫秒
min-evictable-idle-time-millis: 300000
validation-query: select 1
test-while-idle: true
test-on-borrow: false
test-on-return: false
pool-prepared-statements: true
max-pool-prepared-statement-per-connection-size: 20
servlet:
multipart:
max-file-size: 100MB
max-request-size: 100MB
#influxDB内容配置
influx:
url: http://192.168.1.103:18086
user: admin
password: 123456
database: pqsbase_wx
mapper-location: com.njcn.**.imapper
#mybatis配置信息
mybatis-plus:
mapper-locations: classpath*:com/njcn/**/mapping/*.xml
#别名扫描
type-aliases-package: com.njcn.product.**.pojo
configuration:
#驼峰命名
map-underscore-to-camel-case: true
#配置sql日志输出
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# #关闭日志输出
# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
global-config:
db-config:
#指定主键生成策略
id-type: assign_uuid
db:
type: mysql
#文件位置配置
business:
#处理波形数据位置
wavePath: D://comtrade
#wavePath: /usr/local/comtrade
#处理临时数据
#tempPath: D://file
tempPath: /usr/local/file
#文件存储的方式 1.本地
file:
storage: 1
#localStoragePath: /usr/local/localStoragePath
localStoragePath: D://localStoragePath
#oss服务器配置
min:
io:
endpoint: http://192.168.1.22:9009
accessKey: minio
secretKey: minio@123
bucket: excelreport
#华为obs服务器配置
huawei:
access-key: J9GS9EA79PZ60OK23LWP
security-key: BirGrAFDSLxU8ow5fffyXgZRAmMRb1R1AdqCI60d
obs:
bucket: test-8601
endpoint: https://obs.cn-east-3.myhuaweicloud.com
# 单位为秒
expire: 3600
server:
source:
host: 192.168.1.63
port: 22
username: root
password: dnzl@#001
basePath: /
privateKeyPath:
target:
host: 192.168.1.103
port: 22
username: root
password: dnzl@#001
basePath: /usr/local/wuxi
privateKeyPath:

View File

@@ -0,0 +1,84 @@
#当前服务的基本信息
microservice:
ename: file-sync
name: file-sync
server:
port: 19002
spring:
application:
name: file-sync
profiles:
active: wuxi_dev
jackson:
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
locale: zh_CN
serialization:
# 格式化输出
indent_output: false
servlet:
multipart:
max-file-size: 100MB
max-request-size: 100MB
#mybatis配置信息
mybatis-plus:
mapper-locations: classpath*:com/njcn/**/mapping/*.xml
#别名扫描
type-aliases-package: com.njcn.filesync.**.pojo
configuration:
#驼峰命名
map-underscore-to-camel-case: true
#配置sql日志输出
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# #关闭日志输出
# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
global-config:
db-config:
#指定主键生成策略
id-type: assign_uuid
db:
type: mysql
#文件位置配置
business:
#处理波形数据位置
wavePath: D://comtrade
#wavePath: /usr/local/comtrade
#处理临时数据
#tempPath: D://file
tempPath: /usr/local/file
#文件存储的方式 1.本地
file:
storage: 1
#localStoragePath: /usr/local/localStoragePath
localStoragePath: f://localStoragePath
#oss服务器配置
min:
io:
endpoint: http://192.168.1.22:9009
accessKey: minio
secretKey: minio@123
bucket: excelreport
#华为obs服务器配置
huawei:
access-key: J9GS9EA79PZ60OK23LWP
security-key: BirGrAFDSLxU8ow5fffyXgZRAmMRb1R1AdqCI60d
obs:
bucket: test-8601
endpoint: https://obs.cn-east-3.myhuaweicloud.com
# 单位为秒
expire: 3600
#线程池配置信息
threadPool:
corePoolSize: 10
maxPoolSize: 20
queueCapacity: 500
keepAliveSeconds: 60
file:
upload-dir: D:/carry

View File

View File

@@ -11,6 +11,8 @@
<module>mysql-data</module>
<module>influx-data</module>
<module>manufact_influx_data</module>
<module>filesync</module>
</modules>
<packaging>pom</packaging>
<name>灿能数据迁移系统</name>