辽宁二三区数据迁移
This commit is contained in:
72
relational_migration/relational_target/pom.xml
Normal file
72
relational_migration/relational_target/pom.xml
Normal file
@@ -0,0 +1,72 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.njcn</groupId>
|
||||
<artifactId>relational_migration</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
|
||||
|
||||
<artifactId>relational_target</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.njcn</groupId>
|
||||
<artifactId>relational_comm</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- SFTP 文件传输依赖 -->
|
||||
<dependency>
|
||||
<groupId>com.jcraft</groupId>
|
||||
<artifactId>jsch</artifactId>
|
||||
<version>0.1.55</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.bytebuddy</groupId>
|
||||
<artifactId>byte-buddy</artifactId>
|
||||
<version>1.12.10</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-security</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<finalName>relation_target</finalName>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.1</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<encoding>UTF-8</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<!-- 加上下面这段 -->
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.njcn.relational.ann;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 同步表注解
|
||||
* 标识这个Service同步哪张表
|
||||
*/
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface SyncTable {
|
||||
|
||||
/**
|
||||
* 表名
|
||||
*/
|
||||
String value();
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.njcn.relational.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
|
||||
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
|
||||
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
|
||||
|
||||
@Configuration
|
||||
@EnableWebSecurity
|
||||
public class Security extends WebSecurityConfigurerAdapter {
|
||||
|
||||
|
||||
@Override
|
||||
protected void configure(HttpSecurity http) throws Exception {
|
||||
http.httpBasic().and().authorizeRequests().anyRequest().authenticated().and().csrf().disable();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,162 @@
|
||||
package com.njcn.relational.controller;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.njcn.relational.service.SyncTableConfigService;
|
||||
import com.njcn.relational.service.SyncTableParseService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author: cdf
|
||||
* @CreateTime: 2026-05-26
|
||||
* @Description:
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/source")
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class DataTransportController {
|
||||
|
||||
private final SyncTableConfigService syncTableConfigService;
|
||||
|
||||
private final SyncTableParseService syncTableParseService;
|
||||
|
||||
@Value("${spring.profiles.active:query}")
|
||||
private String serverType;
|
||||
|
||||
@GetMapping("export")
|
||||
public void export(@RequestParam(required = false)String date){
|
||||
if(StrUtil.isBlank(date)){
|
||||
date = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern(DatePattern.PURE_DATE_PATTERN));
|
||||
}else {
|
||||
if(!isValidDate(date)){
|
||||
log.error("参数时间格式错误"+date);
|
||||
return;
|
||||
}
|
||||
}
|
||||
syncTableConfigService.syncAllFullTables(date);
|
||||
}
|
||||
|
||||
@GetMapping("import")
|
||||
public void importData(@RequestParam(required = false)String date){
|
||||
if(StrUtil.isBlank(date)){
|
||||
date = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern(DatePattern.PURE_DATE_PATTERN));
|
||||
}else {
|
||||
if(!isValidDate(date)){
|
||||
log.error("参数时间格式错误"+date);
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
syncTableParseService.syncFullTables(date);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("test")
|
||||
public void test(){
|
||||
syncTableConfigService.test();
|
||||
}
|
||||
|
||||
@GetMapping("testR")
|
||||
public void testR(){
|
||||
syncTableParseService.testR();
|
||||
}
|
||||
|
||||
@GetMapping("getRemoteTwoAll")
|
||||
public List<String> getRemoteTwoAll(){
|
||||
try {
|
||||
return syncTableConfigService.getPendingFiles();
|
||||
} catch (Exception e) {
|
||||
log.error("最终捕获二区异常",e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("getRemoteAll")
|
||||
public List<String> getAll(){
|
||||
try {
|
||||
return syncTableParseService.getPendingFiles();
|
||||
} catch (Exception e) {
|
||||
log.error("最终捕获异常",e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("removeTwo")
|
||||
public String removeTwo(@RequestParam Integer a){
|
||||
try {
|
||||
syncTableConfigService.cleanLocalFiles(a);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return "成功";
|
||||
}
|
||||
|
||||
|
||||
@GetMapping("removeThree")
|
||||
public String removeThree(@RequestParam Integer a){
|
||||
try {
|
||||
syncTableParseService.cleanLocalFiles(a);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return "成功";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除横向隔离设备公共摆渡区数据
|
||||
* @author cdf
|
||||
* @date 2026/6/10
|
||||
*/
|
||||
@GetMapping("removeRemote")
|
||||
public String removeRemote(@RequestParam Integer a){
|
||||
|
||||
return "成功";
|
||||
}
|
||||
|
||||
@GetMapping("getConfig")
|
||||
public Map<String, Object> getConfig(){
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
if(serverType.contains("query")){
|
||||
config.put("serverType", "source");
|
||||
}else {
|
||||
config.put("serverType", "target");
|
||||
}
|
||||
config.put("serverTypeName", serverType.contains("query") ? "二区源服务器" : "三区目标服务器");
|
||||
return config;
|
||||
}
|
||||
|
||||
public boolean isValidDate(String dateStr) {
|
||||
if (dateStr == null || dateStr.length() != 8) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
try {
|
||||
LocalDate.parse(dateStr, formatter);
|
||||
return true;
|
||||
} catch (DateTimeParseException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package com.njcn.relational.job;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import com.njcn.relational.service.SyncTableConfigService;
|
||||
import com.njcn.relational.service.SyncTableParseService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
/**
|
||||
* data-migration
|
||||
*
|
||||
* @author cdf
|
||||
* @date 2026/5/29
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Profile("insert_up")
|
||||
public class importJob {
|
||||
|
||||
|
||||
private final SyncTableParseService syncTableParseService;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 三区导入数据
|
||||
* 定时导出数据 - 每天早上3点执行
|
||||
* @author cdf
|
||||
* @date 2026/5/29
|
||||
*/
|
||||
@Profile("insert_up")
|
||||
@Scheduled(cron = "0 55 4 * * ?")
|
||||
public void threeImport(){
|
||||
String date = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern(DatePattern.PURE_DATE_PATTERN));
|
||||
syncTableParseService.syncFullTables(date);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 定时清楚本地数据,默认保留一周
|
||||
* 定时导出数据 - 每天早上3点执行
|
||||
* @author cdf
|
||||
* @date 2026/5/29
|
||||
*/
|
||||
@Scheduled(cron = "0 44 5 * * ?")
|
||||
@Profile("insert_up")
|
||||
public void removeThreeLocal(){
|
||||
syncTableParseService.cleanLocalFiles(7);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.njcn.relational.job;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import com.njcn.relational.service.SyncTableConfigService;
|
||||
import com.njcn.relational.service.SyncTableParseService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
/**
|
||||
* data-migration
|
||||
*
|
||||
* @author cdf
|
||||
* @date 2026/5/29
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Profile("query_up")
|
||||
public class reportJob {
|
||||
|
||||
private final SyncTableConfigService syncTableConfigService;
|
||||
|
||||
|
||||
/**
|
||||
* 二区查询导出数据
|
||||
* 定时导出数据 - 每天早上2点执行
|
||||
* @author cdf
|
||||
* @date 2026/5/29
|
||||
*/
|
||||
@Scheduled(cron = "0 40 4 * * ?")
|
||||
public void twoExport(){
|
||||
String date = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern(DatePattern.PURE_DATE_PATTERN));
|
||||
syncTableConfigService.syncAllFullTables(date);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 定时清楚本地数据,默认保留一周
|
||||
* 定时导出数据 - 每天早上3点执行
|
||||
* @author cdf
|
||||
* @date 2026/5/29
|
||||
*/
|
||||
@Scheduled(cron = "0 33 5 * * ?")
|
||||
public void removeTwoLocal(){
|
||||
syncTableConfigService.cleanLocalFiles(7);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.njcn.relational.serializer;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Blob;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Base64;
|
||||
|
||||
/**
|
||||
* Blob 字段 JSON 序列化器
|
||||
* 将数据库 Blob 类型转换为 Base64 字符串
|
||||
*/
|
||||
public class BlobSerializer extends JsonSerializer<Blob> {
|
||||
|
||||
@Override
|
||||
public void serialize(Blob blob, JsonGenerator gen, SerializerProvider serializers)
|
||||
throws IOException {
|
||||
|
||||
if (blob == null) {
|
||||
gen.writeNull();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 获取 Blob 字节数组
|
||||
byte[] bytes = blob.getBytes(1, (int) blob.length());
|
||||
// 转换为 Base64
|
||||
String base64 = Base64.getEncoder().encodeToString(bytes);
|
||||
gen.writeString(base64);
|
||||
} catch (SQLException e) {
|
||||
gen.writeNull();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.njcn.relational.serializer;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Clob;
|
||||
import java.sql.SQLException;
|
||||
|
||||
/**
|
||||
* Clob 字段 JSON 序列化器
|
||||
* 将数据库 Clob 类型转换为字符串
|
||||
*/
|
||||
public class ClobSerializer extends JsonSerializer<Clob> {
|
||||
|
||||
@Override
|
||||
public void serialize(Clob clob, JsonGenerator gen, SerializerProvider serializers)
|
||||
throws IOException {
|
||||
|
||||
if (clob == null) {
|
||||
gen.writeNull();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 获取 Clob 字符串内容
|
||||
String content = clob.getSubString(1, (int) clob.length());
|
||||
gen.writeString(content);
|
||||
} catch (SQLException e) {
|
||||
gen.writeNull();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,490 @@
|
||||
package com.njcn.relational.service;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import com.njcn.relational.mapper.DynamicSyncMapper;
|
||||
import com.njcn.relational.mapper.SyncTableConfigMapper;
|
||||
import com.njcn.relational.pojo.bo.UploadResult;
|
||||
import com.njcn.relational.pojo.po.SyncTableConfig;
|
||||
import com.njcn.relational.serializer.BlobSerializer;
|
||||
import com.njcn.relational.serializer.ClobSerializer;
|
||||
import com.njcn.relational.utils.SftpUploadUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.io.*;
|
||||
import java.sql.Blob;
|
||||
import java.sql.Clob;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class SyncTableConfigService extends ServiceImpl<SyncTableConfigMapper, SyncTableConfig> {
|
||||
|
||||
@Autowired
|
||||
private DynamicSyncMapper dynamicSyncMapper;
|
||||
|
||||
@Value("${sync.export.remotePath}")
|
||||
private String remotePath;
|
||||
|
||||
@Value("${sync.export.localPath}")
|
||||
private String localPath;
|
||||
|
||||
@Value("${sync.ip}")
|
||||
private String ip;
|
||||
|
||||
@Value("${sync.port:22}")
|
||||
private Integer port;
|
||||
|
||||
@Value("${sync.username}")
|
||||
private String username;
|
||||
|
||||
@Value("${sync.password}")
|
||||
private String password;
|
||||
|
||||
// 新增:表名 -> 时间字段 缓存
|
||||
private Map<String, String> timeColumnCache = new HashMap<>();
|
||||
|
||||
private static final int BATCH_SIZE = 10000;
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
||||
private static final DateTimeFormatter FILE_DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
|
||||
// JSON序列化工具(自动处理时间、换行符等)
|
||||
private final ObjectMapper objectMapper = new ObjectMapper()
|
||||
.registerModule(new JavaTimeModule())
|
||||
.registerModule(new SimpleModule() {{
|
||||
addSerializer(Blob.class, new BlobSerializer());
|
||||
addSerializer(Clob.class, new ClobSerializer());
|
||||
}})
|
||||
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
|
||||
.enable(SerializationFeature.INDENT_OUTPUT)
|
||||
.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));; // 格式化输出,可读性好
|
||||
|
||||
// 缓存当前Schema
|
||||
private String currentSchema;
|
||||
|
||||
// 记录本次生成的所有文件
|
||||
private final List<String> generatedFiles = new ArrayList<>();
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
currentSchema = dynamicSyncMapper.getCurrentUser();
|
||||
log.info("当前数据库Schema: {}", currentSchema);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有启用的全量表(从配置表)
|
||||
*/
|
||||
public List<SyncTableConfig> getEnabledFullTables() {
|
||||
LambdaQueryWrapper<SyncTableConfig> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(SyncTableConfig::getEnabled, 1)
|
||||
.eq(SyncTableConfig::getSyncMode, "FULL")
|
||||
.orderByAsc(SyncTableConfig::getSortOrder);
|
||||
return this.list(wrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有启用的增量表(从配置表)
|
||||
*/
|
||||
public List<SyncTableConfig> getEnabledIncrementTables() {
|
||||
LambdaQueryWrapper<SyncTableConfig> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(SyncTableConfig::getEnabled, 1)
|
||||
.eq(SyncTableConfig::getSyncMode, "INC")
|
||||
.orderByAsc(SyncTableConfig::getSortOrder);
|
||||
return this.list(wrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 确保本地目录存在
|
||||
*/
|
||||
private void ensureLocalDirectory() {
|
||||
File dir = new File(localPath);
|
||||
if (!dir.exists()) {
|
||||
dir.mkdirs();
|
||||
log.info("创建目录: {}", localPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 全量同步所有表
|
||||
*/
|
||||
public void syncAllFullTables(String dateStr) {
|
||||
List<SyncTableConfig> tables = getEnabledFullTables();
|
||||
log.info("========== 全量同步开始,共 {} 张表 ==========", tables.size());
|
||||
|
||||
// 确保目录存在
|
||||
ensureLocalDirectory();
|
||||
generatedFiles.clear();
|
||||
|
||||
for (SyncTableConfig config : tables) {
|
||||
String tableName = config.getTableName();
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
/*IService<?> service = registry.getServiceByTableName(tableName);
|
||||
if (service != null) {
|
||||
String filePath = exportFullTableWithEntity(service, tableName, dateStr);
|
||||
log.info("表 {} 使用实体方式导出完成", tableName);
|
||||
} else {*/
|
||||
String filePath = exportFullTableToJson(tableName, dateStr);
|
||||
log.info("表 {} 使用Map方式导出完成", tableName);
|
||||
// }
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
log.info("表 {} 全量导出完成,耗时 {} ms", tableName, duration);
|
||||
} catch (Exception e) {
|
||||
log.error("表 {} 全量导出失败", tableName, e);
|
||||
}
|
||||
}
|
||||
syncAllIncrementTables(dateStr);
|
||||
}
|
||||
|
||||
/**
|
||||
* 增量同步所有表
|
||||
*/
|
||||
public void syncAllIncrementTables(String dateStr) {
|
||||
List<SyncTableConfig> tables = getEnabledIncrementTables();
|
||||
timeColumnCache = tables.stream().filter(it-> StrUtil.isNotBlank(it.getTimeColumn())).collect(Collectors.toMap(SyncTableConfig::getTableName,SyncTableConfig::getTimeColumn));
|
||||
log.info("========== 增量同步开始,共 {} 张表 ==========", tables.size());
|
||||
for (SyncTableConfig config : tables) {
|
||||
String tableName = config.getTableName();
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
/*IService<?> service = registry.getServiceByTableName(tableName);
|
||||
if (service != null) {
|
||||
String filePath = exportIncrementWithEntity(service, tableName, dateStr);
|
||||
log.info("表 {} 使用实体方式导出完成", tableName);
|
||||
} else {*/
|
||||
String filePath = exportTodayDataToJson(tableName, dateStr);
|
||||
log.info("表 {} 使用Map方式导出完成", tableName);
|
||||
// }
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
log.info("表 {} 增量导出完成,耗时 {} ms", tableName, duration);
|
||||
} catch (Exception e) {
|
||||
log.error("表 {} 增量同步失败", tableName, e);
|
||||
}
|
||||
}
|
||||
|
||||
// 推送所有文件到横向隔离设备
|
||||
if (!generatedFiles.isEmpty()) {
|
||||
uploadAllFiles();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 上传所有生成的文件(批量上传)
|
||||
*/
|
||||
private void uploadAllFiles() {
|
||||
if (generatedFiles.isEmpty()) {
|
||||
log.info("没有文件需要上传");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
List<UploadResult> resultList = SftpUploadUtil.batchUploadFiles(
|
||||
ip, port, username, password, generatedFiles, remotePath
|
||||
);
|
||||
|
||||
// 统计上传结果
|
||||
long successCount = resultList.stream().filter(UploadResult::isSuccess).count();
|
||||
long failCount = resultList.size() - successCount;
|
||||
|
||||
log.info("批量上传完成 - 成功: {}, 失败: {}, 总计: {}", successCount, failCount, resultList.size());
|
||||
// 记录失败的文件
|
||||
for (UploadResult result : resultList) {
|
||||
if (!result.isSuccess()) {
|
||||
log.error("上传失败文件: {}, 原因: {}", result.getLocalFilePath(), result.getErrorMessage());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("批量上传失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Map方式全量导出为JSON(保存为.txt文件)
|
||||
*/
|
||||
@Transactional
|
||||
public String exportFullTableToJson(String tableName, String dateStr) throws IOException {
|
||||
String fileName = String.format("%s_FULL_%s.txt", tableName, dateStr);
|
||||
String filePath = localPath + fileName;
|
||||
|
||||
log.info("开始导出全量表: {} -> {}", tableName, filePath);
|
||||
|
||||
List<Map<String, Object>> allData = new ArrayList<>();
|
||||
int offset = 0;
|
||||
|
||||
while (true) {
|
||||
List<Map<String, Object>> batchData = dynamicSyncMapper.selectPage(
|
||||
tableName, BATCH_SIZE, offset);
|
||||
|
||||
if (batchData.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
allData.addAll(batchData);
|
||||
log.info("表 {} 已读取 {} 条记录", tableName, allData.size());
|
||||
|
||||
if (batchData.size() < BATCH_SIZE) {
|
||||
break;
|
||||
}
|
||||
offset += BATCH_SIZE;
|
||||
}
|
||||
|
||||
// 写入JSON格式到.txt文件
|
||||
try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath))) {
|
||||
objectMapper.writeValue(bos, allData);
|
||||
}
|
||||
|
||||
|
||||
|
||||
// 记录生成的文件
|
||||
generatedFiles.add(filePath);
|
||||
|
||||
log.info("表 {} 全量导出完成,共 {} 条记录,文件: {}", tableName, allData.size(), filePath);
|
||||
return filePath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map方式增量导出为JSON(保存为.txt文件)
|
||||
*/
|
||||
@Transactional
|
||||
public String exportTodayDataToJson(String tableName, String dateStr) throws IOException {
|
||||
|
||||
// 1. 判断表属于哪种周期:D/M/Q/Y
|
||||
String periodType = getTablePeriodType(tableName);
|
||||
if (periodType == null) {
|
||||
log.warn("表 {} 无法识别周期类型,跳过增量导出", tableName);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 2. 根据周期构建 开始/结束 时间
|
||||
Map<String, String> timeRange = buildPeriodTimeRange(periodType, dateStr);
|
||||
String dateStart = timeRange.get("start");
|
||||
String dateEnd = timeRange.get("end");
|
||||
|
||||
log.info("导出表 {} 数据: {} 至 {}", tableName, dateStart, dateEnd);
|
||||
|
||||
String timeColumn = getTimeColumn(tableName);
|
||||
|
||||
List<Map<String, Object>> queryData = dynamicSyncMapper.selectTodayData(
|
||||
tableName, timeColumn, dateStart, dateEnd);
|
||||
|
||||
if (queryData.isEmpty()) {
|
||||
log.info("表 {} {} - {}无数据", tableName,dateStart,dateEnd);
|
||||
return null;
|
||||
}
|
||||
|
||||
String fileName = String.format("%s_INC_%s.txt", tableName, dateStr);
|
||||
String filePath = localPath + fileName;
|
||||
|
||||
// 写入JSON格式到.txt文件
|
||||
try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath))) {
|
||||
objectMapper.writeValue(bos, queryData);
|
||||
}
|
||||
|
||||
// 记录生成的文件
|
||||
generatedFiles.add(filePath);
|
||||
|
||||
log.info("表 {} {}数据导出完成,共 {} 条记录,文件: {}", tableName,dateStr, queryData.size(), filePath);
|
||||
return filePath;
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 从表名识别周期类型
|
||||
* D=天, M=月, Q=季, Y=年
|
||||
*/
|
||||
private String getTablePeriodType(String tableName) {
|
||||
|
||||
if (tableName.endsWith("_D") || tableName.equalsIgnoreCase("R_MP_V_THD")) return "D";
|
||||
if (tableName.endsWith("_M")) return "M";
|
||||
if (tableName.endsWith("_Q")) return "Q";
|
||||
if (tableName.endsWith("_Y")) return "Y";
|
||||
return null;
|
||||
}
|
||||
/**
|
||||
* 根据周期 & 入参日期,计算【当前周期】时间范围(JDK8 兼容)
|
||||
*/
|
||||
private Map<String, String> buildPeriodTimeRange(String periodType, String dateStr) {
|
||||
DateTime baseDate = DateUtil.parse(dateStr, DatePattern.PURE_DATE_PATTERN);
|
||||
String start;
|
||||
String end;
|
||||
|
||||
// JDK8 标准 switch
|
||||
switch (periodType) {
|
||||
case "D":
|
||||
// 入参日期 当天
|
||||
start = DateUtil.format(DateUtil.beginOfDay(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
end = DateUtil.format(DateUtil.endOfDay(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
break;
|
||||
case "M":
|
||||
// 入参日期 所在整月
|
||||
start = DateUtil.format(DateUtil.beginOfMonth(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
end = DateUtil.format(DateUtil.endOfMonth(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
break;
|
||||
case "Q":
|
||||
// 入参日期 所在整季度
|
||||
start = DateUtil.format(DateUtil.beginOfQuarter(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
end = DateUtil.format(DateUtil.endOfQuarter(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
break;
|
||||
case "Y":
|
||||
// 入参日期 所在整年
|
||||
start = DateUtil.format(DateUtil.beginOfYear(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
end = DateUtil.format(DateUtil.endOfYear(baseDate), DatePattern.NORM_DATETIME_PATTERN);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("不支持的周期类型:" + periodType);
|
||||
}
|
||||
|
||||
Map<String, String> map = new HashMap<>(2);
|
||||
map.put("start", start);
|
||||
map.put("end", end);
|
||||
return map;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 获取时间字段
|
||||
*/
|
||||
private String getTimeColumn(String tableName) {
|
||||
String createTitle = timeColumnCache.get(tableName);
|
||||
if(Objects.isNull(createTitle)){
|
||||
switch (tableName) {
|
||||
case "R_STAT_DATA_FLICKER_D":
|
||||
case "R_STAT_DATA_FLUC_D":
|
||||
case "R_STAT_DATA_HARMPHASIC_I_D":
|
||||
case "R_STAT_DATA_HARMPHASIC_V_D":
|
||||
case "R_STAT_DATA_HARMPOWER_P_D":
|
||||
case "R_STAT_DATA_HARMPOWER_Q_D":
|
||||
case "R_STAT_DATA_HARMPOWER_S_D":
|
||||
case "R_STAT_DATA_HARMRATE_I_D":
|
||||
case "R_STAT_DATA_HARMRATE_V_D":
|
||||
case "R_STAT_DATA_INHARM_I_D":
|
||||
case "R_STAT_DATA_INHARM_V_D":
|
||||
case "R_STAT_DATA_I_D":
|
||||
case "R_STAT_DATA_PLT_D":
|
||||
case "R_STAT_DATA_V_D":
|
||||
return "TIME";
|
||||
// 新增表
|
||||
case "R_STAT_COMASSES_D":
|
||||
case "R_STAT_ASSES_D":
|
||||
case "R_STAT_LIMIT_RATE_D":
|
||||
case "R_STAT_ONLINERATE_D":
|
||||
case "R_STAT_INTEGRITY_D":
|
||||
case "R_STAT_ORG_INTEGRITY_D":
|
||||
case "R_STAT_LIMIT_RATE_DETAIL_D":
|
||||
case "R_STAT_LIMIT_TARGET_D":
|
||||
return "TIME_ID";
|
||||
case "R_MP_POLLUTION_D":
|
||||
case "R_MP_V_THD":
|
||||
case "R_STAT_POLLUTION_ORG_D":
|
||||
case "R_STAT_POLLUTION_ORG_M":
|
||||
case "R_STAT_POLLUTION_ORG_Q":
|
||||
case "R_STAT_POLLUTION_ORG_Y":
|
||||
case "R_STAT_POLLUTION_SUBSTATION_D":
|
||||
case "R_STAT_POLLUTION_SUBSTATION_M":
|
||||
case "R_STAT_POLLUTION_SUBSTATION_Q":
|
||||
case "R_STAT_POLLUTION_SUBSTATION_Y":
|
||||
return "DATA_DATE";
|
||||
default:
|
||||
log.warn("表 {} 未配置时间字段,跳过", tableName);
|
||||
return "CREATE_TIME";
|
||||
}
|
||||
}else {
|
||||
return createTitle;
|
||||
}
|
||||
}
|
||||
|
||||
public void test(){
|
||||
try {
|
||||
// 确保目录存在
|
||||
File localDir = new File(localPath);
|
||||
if (!localDir.exists()) {
|
||||
localDir.mkdirs();
|
||||
}
|
||||
|
||||
// 生成txt文件
|
||||
String filePath = localPath + "test.txt";
|
||||
try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
|
||||
writer.write("test");
|
||||
}
|
||||
|
||||
// 上传到SFTP服务器
|
||||
SftpUploadUtil.uploadFile(ip, port, username, password, filePath, remotePath);
|
||||
|
||||
log.info("test.txt文件生成并上传成功");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getPendingFiles() {
|
||||
List<String> matchedFiles = new ArrayList<>();
|
||||
Vector<?> files;
|
||||
try {
|
||||
// 列目录也纳入异常捕获
|
||||
files = SftpUploadUtil.listFiles(ip, port, username, password, remotePath);
|
||||
for (Object obj : files) {
|
||||
if (obj instanceof com.jcraft.jsch.ChannelSftp.LsEntry) {
|
||||
com.jcraft.jsch.ChannelSftp.LsEntry entry = (com.jcraft.jsch.ChannelSftp.LsEntry) obj;
|
||||
String fileName = entry.getFilename();
|
||||
matchedFiles.add(fileName);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("获取待处理文件列表失败,远端目录:{}", remotePath, e);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return matchedFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理本地已处理过的文件
|
||||
*/
|
||||
public void cleanLocalFiles(int daysToKeep) {
|
||||
File localDir = new File(localPath);
|
||||
if (!localDir.exists()) {
|
||||
return;
|
||||
}
|
||||
|
||||
long cutoffTime = System.currentTimeMillis() - (daysToKeep * 24L * 60 * 60 * 1000);
|
||||
File[] files = localDir.listFiles();
|
||||
|
||||
if (files != null) {
|
||||
int deletedCount = 0;
|
||||
for (File file : files) {
|
||||
if (file.isFile() && file.lastModified() < cutoffTime) {
|
||||
file.delete();
|
||||
deletedCount++;
|
||||
}
|
||||
}
|
||||
log.info("清理了10.11.7.5二区服务器 {} 个七天前的本地txt文件", deletedCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,796 @@
|
||||
package com.njcn.relational.service;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import com.njcn.relational.mapper.DynamicSyncMapper;
|
||||
import com.njcn.relational.mapper.SyncTableConfigMapper;
|
||||
import com.njcn.relational.pojo.bo.DownloadResult;
|
||||
import com.njcn.relational.pojo.po.SyncTableConfig;
|
||||
import com.njcn.relational.utils.SftpUploadUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.io.*;
|
||||
import java.nio.file.*;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class SyncTableParseService {
|
||||
|
||||
@Autowired
|
||||
private DynamicSyncMapper dynamicSyncMapper;
|
||||
|
||||
@Autowired
|
||||
private SyncTableConfigMapper syncTableConfigMapper;
|
||||
|
||||
// 缓存当前Schema
|
||||
private String currentSchema;
|
||||
|
||||
@Value("${sync.import.localPath}")
|
||||
private String localPath;
|
||||
|
||||
@Value("${sync.import.remotePath}")
|
||||
private String remotePath;
|
||||
|
||||
@Value("${sync.ip}")
|
||||
private String ip;
|
||||
|
||||
@Value("${sync.port:22}")
|
||||
private Integer port;
|
||||
|
||||
@Value("${sync.username}")
|
||||
private String username;
|
||||
|
||||
@Value("${sync.password}")
|
||||
private String password;
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate targetJdbcTemplate;
|
||||
|
||||
private static final int BATCH_SIZE = 10000;
|
||||
private static final DateTimeFormatter FILE_DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
|
||||
// JSON解析工具
|
||||
private final ObjectMapper objectMapper = new ObjectMapper()
|
||||
.registerModule(new JavaTimeModule())
|
||||
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
|
||||
.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai")); // 格式化输出,可读性好;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
currentSchema = dynamicSyncMapper.getCurrentUser();
|
||||
log.info("当前数据库Schema: {}", currentSchema);
|
||||
}
|
||||
|
||||
private Map<String, String> syncData(){
|
||||
// 加载表配置到缓存
|
||||
Map<String, String> tableConfigCache = new HashMap<>();
|
||||
LambdaQueryWrapper<SyncTableConfig> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(SyncTableConfig::getEnabled, 1).isNotNull(SyncTableConfig::getBlobColumns);
|
||||
List<SyncTableConfig> configs = syncTableConfigMapper.selectList(wrapper);
|
||||
for (SyncTableConfig config : configs) {
|
||||
tableConfigCache.put(config.getTableName().toUpperCase(), config.getBlobColumns());
|
||||
}
|
||||
return tableConfigCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量同步所有表的数据文件(使用批量下载并删除)
|
||||
*/
|
||||
public void batchSyncAllTables(String dataStr) {
|
||||
log.info("========== 开始批量同步数据{} ==========",LocalDateTime.now());
|
||||
|
||||
// 缓存表配置
|
||||
Map<String, String> tableConfigCache = syncData();
|
||||
log.info("加载表配置缓存完成,共 {} 张表", tableConfigCache.size());
|
||||
|
||||
// 确保本地目录存在
|
||||
ensureLocalDirectory();
|
||||
|
||||
try {
|
||||
// 1. 获取待下载的文件列表
|
||||
List<String> remoteFilePaths = getRemoteFilePaths(dataStr);
|
||||
|
||||
if (remoteFilePaths.isEmpty()) {
|
||||
log.info("没有从横向隔离设备找到匹配的文件");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("找到 {} 个文件待处理", remoteFilePaths.size());
|
||||
|
||||
// 2. 批量下载并删除远程文件
|
||||
List<DownloadResult> downloadResults = SftpUploadUtil.batchDownloadAndDelete(
|
||||
ip, port, username, password,
|
||||
remoteFilePaths, localPath, true // true = 下载后删除远程文件
|
||||
);
|
||||
|
||||
// 3. 统计下载结果
|
||||
long downloadSuccessCount = downloadResults.stream().filter(DownloadResult::isSuccess).count();
|
||||
long downloadFailCount = downloadResults.size() - downloadSuccessCount;
|
||||
log.info("批量下载完成 - 成功: {}, 失败: {}", downloadSuccessCount, downloadFailCount);
|
||||
|
||||
// 记录下载失败的文件
|
||||
for (DownloadResult result : downloadResults) {
|
||||
if (!result.isSuccess()) {
|
||||
log.error("下载失败: {}, 原因: {}", result.getRemoteFilePath(), result.getErrorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 处理下载成功的文件(导入数据)
|
||||
List<DownloadResult> successfulDownloads = downloadResults.stream()
|
||||
.filter(DownloadResult::isSuccess)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (successfulDownloads.isEmpty()) {
|
||||
log.warn("没有成功下载的文件,停止处理");
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 批量导入数据
|
||||
int importSuccessCount = 0;
|
||||
int importFailCount = 0;
|
||||
|
||||
for (DownloadResult downloadResult : successfulDownloads) {
|
||||
try {
|
||||
String tableName = extractTableNameFromFile(downloadResult.getRemoteFilePath());
|
||||
String localFilePath = downloadResult.getLocalFilePath();
|
||||
|
||||
log.info("开始导入表: {}, 文件: {}", tableName, localFilePath);
|
||||
importJsonToTable(tableName, localFilePath,tableConfigCache);
|
||||
|
||||
importSuccessCount++;
|
||||
log.info("表 {} 导入成功", tableName);
|
||||
|
||||
// 导入成功后删除本地文件
|
||||
deleteLocalFile(localFilePath);
|
||||
|
||||
} catch (Exception e) {
|
||||
importFailCount++;
|
||||
log.error("导入失败: {}", downloadResult.getRemoteFilePath(), e);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("批量导入完成 - 成功: {}, 失败: {}", importSuccessCount, importFailCount);
|
||||
log.info("========== 数据批量同步完成 ==========");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("批量同步失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取远程文件路径列表
|
||||
*/
|
||||
private List<String> getRemoteFilePaths(String dateStr) throws Exception {
|
||||
// 获取远程目录下所有文件
|
||||
Vector<?> files = SftpUploadUtil.listFiles(ip, port, username, password, remotePath);
|
||||
|
||||
// 列出远程目录下所有匹配的文件
|
||||
String remoteFilePattern = buildRemoteFilePattern("FULL", dateStr);
|
||||
List<String> matchedFiles = listRemoteFiles(remoteFilePattern,files);
|
||||
String remoteFilePatternInc = buildRemoteFilePattern("INC", dateStr);
|
||||
List<String> matchedFilesInc = listRemoteFiles(remoteFilePatternInc,files);
|
||||
|
||||
// 转换为完整路径
|
||||
List<String> remoteFilePaths = new ArrayList<>();
|
||||
for (String fileName : matchedFiles) {
|
||||
remoteFilePaths.add(remotePath + (remotePath.endsWith("/") ? "" : "/") + fileName);
|
||||
}
|
||||
for (String fileName : matchedFilesInc) {
|
||||
remoteFilePaths.add(remotePath + (remotePath.endsWith("/") ? "" : "/") + fileName);
|
||||
}
|
||||
return remoteFilePaths;
|
||||
}
|
||||
|
||||
/**
|
||||
* 全量同步
|
||||
*/
|
||||
public void syncFullTables(String dataStr) {
|
||||
batchSyncAllTables(dataStr);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 根据同步模式获取日期字符串
|
||||
*/
|
||||
private String getDateStrBySyncMode(String syncMode) {
|
||||
if ("FULL".equals(syncMode)) {
|
||||
// 全量同步使用昨天的日期
|
||||
return LocalDate.now().minusDays(1).format(FILE_DATE_FORMATTER);
|
||||
} else {
|
||||
// 增量同步使用今天的日期
|
||||
return LocalDate.now().format(FILE_DATE_FORMATTER);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建远程文件名匹配模式
|
||||
*/
|
||||
private String buildRemoteFilePattern(String syncMode, String dateStr) {
|
||||
return String.format(".*_%s_%s\\.txt$", syncMode, dateStr);
|
||||
}
|
||||
|
||||
/**
|
||||
* 列出远程目录下所有匹配的文件
|
||||
*/
|
||||
private List<String> listRemoteFiles(String pattern,Vector<?> files) throws Exception {
|
||||
List<String> matchedFiles = new ArrayList<>();
|
||||
|
||||
try {
|
||||
Pattern regexPattern = Pattern.compile(pattern);
|
||||
for (Object obj : files) {
|
||||
if (obj instanceof com.jcraft.jsch.ChannelSftp.LsEntry) {
|
||||
com.jcraft.jsch.ChannelSftp.LsEntry entry = (com.jcraft.jsch.ChannelSftp.LsEntry) obj;
|
||||
String fileName = entry.getFilename();
|
||||
|
||||
// 跳过目录和特殊文件
|
||||
if (entry.getAttrs().isDir() || fileName.equals(".") || fileName.equals("..")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 匹配文件模式
|
||||
if (regexPattern.matcher(fileName).matches()) {
|
||||
matchedFiles.add(fileName);
|
||||
log.debug("匹配到文件: {}", fileName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("匹配远程文件失败: {}", pattern, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return matchedFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* 确保本地目录存在
|
||||
*/
|
||||
private void ensureLocalDirectory() {
|
||||
File dir = new File(localPath);
|
||||
if (!dir.exists()) {
|
||||
dir.mkdirs();
|
||||
log.info("创建本地目录: {}", localPath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从文件路径提取表名
|
||||
* 文件名格式: 表名_FULL_20260127.txt 或 表名_INC_20260127.txt
|
||||
*/
|
||||
private String extractTableNameFromFile(String filePath) {
|
||||
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
|
||||
Pattern pattern = Pattern.compile("^(.+?)_(FULL|INC)_\\d+\\.txt$");
|
||||
Matcher matcher = pattern.matcher(fileName);
|
||||
|
||||
if (matcher.find()) {
|
||||
return matcher.group(1);
|
||||
}
|
||||
|
||||
// 如果没有匹配到,直接去掉扩展名
|
||||
int extIndex = fileName.lastIndexOf(".");
|
||||
if (extIndex > 0) {
|
||||
return fileName.substring(0, extIndex);
|
||||
}
|
||||
|
||||
return fileName;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否全量导入(从文件名)
|
||||
*/
|
||||
private boolean isFullImport(String fileName) {
|
||||
return fileName.contains("_FULL");
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除本地文件
|
||||
*/
|
||||
private void deleteLocalFile(String localFilePath) {
|
||||
try {
|
||||
File localFile = new File(localFilePath);
|
||||
if (localFile.exists()) {
|
||||
localFile.delete();
|
||||
log.debug("已删除本地文件: {}", localFilePath);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("删除本地文件失败: {}", localFilePath, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从配置缓存中获取BLOB字段列表
|
||||
*/
|
||||
private List<String> getBlobColumnsFromCache(String tableName, Map<String, String> tableConfigCache) {
|
||||
String blobColumnsStr = tableConfigCache.get(tableName.toUpperCase());
|
||||
if (blobColumnsStr == null || blobColumnsStr.trim().isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
// 按逗号分隔,并转换为大写
|
||||
String[] blobArray = blobColumnsStr.split(",");
|
||||
List<String> blobColumns = new ArrayList<>();
|
||||
for (String blob : blobArray) {
|
||||
blobColumns.add(blob.trim().toUpperCase());
|
||||
}
|
||||
|
||||
if (!blobColumns.isEmpty()) {
|
||||
log.info("表 {} 配置的BLOB字段: {}", tableName, blobColumns);
|
||||
}
|
||||
return blobColumns;
|
||||
}
|
||||
|
||||
/**
|
||||
* 导入JSON数据到表(支持BLOB字段)
|
||||
*/
|
||||
@Transactional
|
||||
public void importJsonToTable(String tableName, String filePath, Map<String, String> tableConfigCache) throws Exception {
|
||||
log.info("开始导入表: {}, 文件: {}", tableName, filePath);
|
||||
|
||||
File dataFile = new File(filePath);
|
||||
if (!dataFile.exists()) {
|
||||
throw new RuntimeException("数据文件不存在: " + filePath);
|
||||
}
|
||||
|
||||
// 从文件名判断是否全量导入
|
||||
String fileName = dataFile.getName();
|
||||
boolean isFull = isFullImport(fileName);
|
||||
|
||||
// 读取JSON数据
|
||||
List<Map<String, Object>> dataList = objectMapper.readValue(
|
||||
dataFile,
|
||||
new TypeReference<List<Map<String, Object>>>() {}
|
||||
);
|
||||
|
||||
if (dataList.isEmpty()) {
|
||||
log.info("表 {} 无数据,跳过导入", tableName);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("表 {} 共 {} 条数据,{} 导入", tableName, dataList.size(), isFull ? "全量" : "增量(存在则更新)");
|
||||
|
||||
// 如果是全量导入,先清空表
|
||||
if (isFull) {
|
||||
log.info("全量导入,清空表: {}", tableName);
|
||||
dynamicSyncMapper.truncateTable(tableName);
|
||||
}
|
||||
|
||||
// 获取目标表的列信息
|
||||
List<String> columns = dynamicSyncMapper.getTableColumns(tableName.toUpperCase(), currentSchema);
|
||||
|
||||
if (columns == null || columns.isEmpty()) {
|
||||
throw new RuntimeException("表 " + tableName + " 不存在或没有列");
|
||||
}
|
||||
|
||||
// 获取时间字段列表
|
||||
List<String> timeColumns = getTimeColumns(tableName.toUpperCase());
|
||||
|
||||
// 【从缓存配置中获取BLOB字段列表】
|
||||
List<String> blobColumns = getBlobColumnsFromCache(tableName, tableConfigCache);
|
||||
|
||||
// 转换数据:Map key转大写,并将时间字段转换为Timestamp,BLOB字段转换为byte[]
|
||||
List<Map<String, Object>> convertedData = new ArrayList<>();
|
||||
for (Map<String, Object> row : dataList) {
|
||||
Map<String, Object> convertedRow = new LinkedHashMap<>();
|
||||
for (Map.Entry<String, Object> entry : row.entrySet()) {
|
||||
String key = entry.getKey().toUpperCase();
|
||||
Object value = entry.getValue();
|
||||
|
||||
// 如果是BLOB字段且值为字符串,转换为byte[]
|
||||
if (blobColumns.contains(key) && value instanceof String) {
|
||||
String blobStr = (String) value;
|
||||
if (blobStr != null && !blobStr.isEmpty()) {
|
||||
// 检查是否是 Base64 字符串(不是占位符)
|
||||
if (!blobStr.startsWith("[BLOB_")) {
|
||||
byte[] bytes = convertBase64ToBytes(blobStr);
|
||||
if (bytes != null && bytes.length > 0) {
|
||||
convertedRow.put(key, bytes);
|
||||
log.debug("BLOB字段 {} 已转换为byte[]", key);
|
||||
} else {
|
||||
convertedRow.put(key, null);
|
||||
log.warn("BLOB字段 {} 转换失败,设置为null", key);
|
||||
}
|
||||
} else {
|
||||
convertedRow.put(key, null);
|
||||
log.debug("BLOB字段 {} 为占位符,设置为null", key);
|
||||
}
|
||||
} else {
|
||||
convertedRow.put(key, null);
|
||||
}
|
||||
}
|
||||
// 如果是时间字段且值为字符串,转换为Timestamp
|
||||
else if (timeColumns.contains(key) && value instanceof String) {
|
||||
String timeStr = (String) value;
|
||||
if (timeStr != null && !timeStr.isEmpty()) {
|
||||
value = convertToTimestamp(timeStr);
|
||||
}
|
||||
}
|
||||
convertedRow.put(key, value);
|
||||
}
|
||||
convertedData.add(convertedRow);
|
||||
}
|
||||
|
||||
// ==============================================
|
||||
// 核心修改:增量使用MERGE INTO(存在更新/不存在插入)
|
||||
// ==============================================
|
||||
int totalCount = 0;
|
||||
if (isFull) {
|
||||
// 全量:批量INSERT
|
||||
String insertSql = buildInsertSql(tableName, columns);
|
||||
List<Map<String, Object>> batchList = new ArrayList<>();
|
||||
|
||||
for (Map<String, Object> row : convertedData) {
|
||||
batchList.add(row);
|
||||
if (batchList.size() >= BATCH_SIZE) {
|
||||
totalCount += executeBatchInsert(insertSql, columns, batchList);
|
||||
batchList.clear();
|
||||
log.info("表 {} 已导入 {} 条记录", tableName, totalCount);
|
||||
}
|
||||
}
|
||||
if (!batchList.isEmpty()) {
|
||||
totalCount += executeBatchInsert(insertSql, columns, batchList);
|
||||
}
|
||||
} else {
|
||||
// 增量:达梦MERGE INTO 存在则更新,不存在则插入
|
||||
List<String> primaryKeys = getTablePrimaryKeys(tableName);
|
||||
if (primaryKeys.isEmpty()) {
|
||||
throw new RuntimeException("增量同步必须有主键!表:" + tableName + " 未找到主键");
|
||||
}
|
||||
String mergeSql = buildMergeSql(tableName, columns, primaryKeys);
|
||||
List<Map<String, Object>> batchList = new ArrayList<>();
|
||||
|
||||
for (Map<String, Object> row : convertedData) {
|
||||
batchList.add(row);
|
||||
if (batchList.size() >= BATCH_SIZE) {
|
||||
totalCount += executeBatchMerge(mergeSql, columns, primaryKeys, batchList);
|
||||
batchList.clear();
|
||||
log.info("表 {} 已同步(更新/插入) {} 条记录", tableName, totalCount);
|
||||
}
|
||||
}
|
||||
if (!batchList.isEmpty()) {
|
||||
totalCount += executeBatchMerge(mergeSql, columns, primaryKeys, batchList);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("表 {} 导入完成,共 {} 条记录", tableName, totalCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将Base64字符串转换为byte[]
|
||||
*/
|
||||
private byte[] convertBase64ToBytes(String base64Str) {
|
||||
if (base64Str == null || base64Str.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
// 去除可能的引号
|
||||
String cleaned = base64Str.trim();
|
||||
if (cleaned.startsWith("\"") && cleaned.endsWith("\"")) {
|
||||
cleaned = cleaned.substring(1, cleaned.length() - 1);
|
||||
}
|
||||
|
||||
log.debug("Base64 字符串长度: {}, 前50字符: {}", cleaned.length(),
|
||||
cleaned.length() > 50 ? cleaned.substring(0, 50) : cleaned);
|
||||
|
||||
byte[] bytes = java.util.Base64.getDecoder().decode(cleaned);
|
||||
log.debug("解码后字节数组长度: {}", bytes.length);
|
||||
return bytes;
|
||||
} catch (Exception e) {
|
||||
log.warn("Base64解码失败: {}, 字符串前100字符: {}", e.getMessage(),
|
||||
base64Str.length() > 100 ? base64Str.substring(0, 100) : base64Str);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取表主键(达梦)
|
||||
*/
|
||||
private List<String> getTablePrimaryKeys(String tableName) {
|
||||
String sql = "SELECT COLUMN_NAME FROM ALL_CONS_COLUMNS A " +
|
||||
"JOIN ALL_CONSTRAINTS B ON A.CONSTRAINT_NAME = B.CONSTRAINT_NAME AND A.OWNER = B.OWNER " +
|
||||
"WHERE B.OWNER = ? AND B.TABLE_NAME = ? AND B.CONSTRAINT_TYPE = 'P' " +
|
||||
"ORDER BY A.POSITION";
|
||||
try {
|
||||
List<String> pks = targetJdbcTemplate.queryForList(sql, String.class, currentSchema, tableName);
|
||||
log.info("表 {} 主键: {}", tableName, pks);
|
||||
return pks != null ? pks : new ArrayList<>();
|
||||
} catch (Exception e) {
|
||||
log.error("获取主键失败", e);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建达梦MERGE INTO SQL(存在更新,不存在插入)
|
||||
*/
|
||||
private String buildMergeSql(String tableName, List<String> columns, List<String> primaryKeys) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("MERGE INTO ").append(tableName).append(" T1 ");
|
||||
sb.append("USING (SELECT ");
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (i > 0) sb.append(",");
|
||||
sb.append("? AS ").append(columns.get(i));
|
||||
}
|
||||
sb.append(" FROM DUAL) T2 ");
|
||||
sb.append("ON (");
|
||||
for (int i = 0; i < primaryKeys.size(); i++) {
|
||||
if (i > 0) sb.append(" AND ");
|
||||
sb.append("T1.").append(primaryKeys.get(i)).append(" = T2.").append(primaryKeys.get(i));
|
||||
}
|
||||
sb.append(") ");
|
||||
sb.append("WHEN MATCHED THEN UPDATE SET ");
|
||||
List<String> nonPkColumns = new ArrayList<>();
|
||||
for (String col : columns) {
|
||||
if (!primaryKeys.contains(col)) nonPkColumns.add(col);
|
||||
}
|
||||
for (int i = 0; i < nonPkColumns.size(); i++) {
|
||||
if (i > 0) sb.append(",");
|
||||
sb.append("T1.").append(nonPkColumns.get(i)).append(" = T2.").append(nonPkColumns.get(i));
|
||||
}
|
||||
sb.append(" WHEN NOT MATCHED THEN INSERT (");
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (i > 0) sb.append(",");
|
||||
sb.append(columns.get(i));
|
||||
}
|
||||
sb.append(") VALUES (");
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (i > 0) sb.append(",");
|
||||
sb.append("T2.").append(columns.get(i));
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量执行MERGE
|
||||
*/
|
||||
private int executeBatchMerge(String sql, List<String> columns, List<String> primaryKeys, List<Map<String, Object>> batchList) {
|
||||
List<Object[]> batchArgs = new ArrayList<>();
|
||||
for (Map<String, Object> row : batchList) {
|
||||
Object[] args = new Object[columns.size()];
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
args[i] = row.get(columns.get(i));
|
||||
}
|
||||
batchArgs.add(args);
|
||||
}
|
||||
int[] results = targetJdbcTemplate.batchUpdate(sql, batchArgs);
|
||||
return Arrays.stream(results).sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取表中的时间类型字段
|
||||
*/
|
||||
private List<String> getTimeColumns(String tableName) {
|
||||
String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS " +
|
||||
"WHERE TABLE_NAME = ? AND OWNER = ? " +
|
||||
"AND DATA_TYPE IN ('DATE', 'TIMESTAMP')";
|
||||
try {
|
||||
List<String> timeColumns = targetJdbcTemplate.queryForList(sql, String.class, tableName, currentSchema);
|
||||
log.debug("表 {} 的时间字段: {}", tableName, timeColumns);
|
||||
return timeColumns != null ? timeColumns : new ArrayList<>();
|
||||
} catch (Exception e) {
|
||||
log.warn("获取时间字段失败: {}", e.getMessage());
|
||||
return new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将字符串转换为 java.sql.Timestamp(增强版)
|
||||
*/
|
||||
private Timestamp convertToTimestamp(String timeStr) {
|
||||
if (timeStr == null || timeStr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
// 去除可能的引号
|
||||
timeStr = timeStr.trim();
|
||||
if (timeStr.startsWith("\"") && timeStr.endsWith("\"")) {
|
||||
timeStr = timeStr.substring(1, timeStr.length() - 1);
|
||||
}
|
||||
|
||||
// 尝试多种解析器
|
||||
// 1. ISO_OFFSET_DATE_TIME (支持 2025-04-17T03:42:04.000+00:00)
|
||||
try {
|
||||
java.time.OffsetDateTime offsetDateTime = java.time.OffsetDateTime.parse(timeStr);
|
||||
return Timestamp.valueOf(offsetDateTime.toLocalDateTime());
|
||||
} catch (Exception e) {
|
||||
// 继续尝试其他格式
|
||||
}
|
||||
|
||||
// 2. ISO_LOCAL_DATE_TIME (支持 2025-04-17T03:42:04)
|
||||
try {
|
||||
java.time.LocalDateTime localDateTime = java.time.LocalDateTime.parse(timeStr);
|
||||
return Timestamp.valueOf(localDateTime);
|
||||
} catch (Exception e) {
|
||||
// 继续尝试其他格式
|
||||
}
|
||||
|
||||
// 3. 自定义格式: yyyy-MM-dd HH:mm:ss
|
||||
try {
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
LocalDateTime ldt = LocalDateTime.parse(timeStr, formatter);
|
||||
return Timestamp.valueOf(ldt);
|
||||
} catch (Exception e) {
|
||||
// 继续尝试其他格式
|
||||
}
|
||||
|
||||
// 4. 日期格式: yyyy-MM-dd
|
||||
try {
|
||||
LocalDate ld = LocalDate.parse(timeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
|
||||
return Timestamp.valueOf(ld.atStartOfDay());
|
||||
} catch (Exception e) {
|
||||
// 继续尝试其他格式
|
||||
}
|
||||
|
||||
// 5. 最后尝试直接转换
|
||||
return Timestamp.valueOf(timeStr);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("时间转换失败: {}, 返回null", timeStr);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建INSERT语句
|
||||
*/
|
||||
private String buildInsertSql(String tableName, List<String> columns) {
|
||||
StringBuilder sql = new StringBuilder("INSERT INTO ");
|
||||
sql.append(tableName).append(" (");
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (i > 0) sql.append(", ");
|
||||
sql.append(columns.get(i));
|
||||
}
|
||||
sql.append(") VALUES (");
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (i > 0) sql.append(", ");
|
||||
sql.append("?");
|
||||
}
|
||||
sql.append(")");
|
||||
return sql.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行批量插入
|
||||
*/
|
||||
private int executeBatchInsert(String sql, List<String> columns, List<Map<String, Object>> batchList) {
|
||||
List<Object[]> batchArgs = new ArrayList<>();
|
||||
for (Map<String, Object> row : batchList) {
|
||||
Object[] args = new Object[columns.size()];
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
args[i] = row.get(columns.get(i));
|
||||
}
|
||||
batchArgs.add(args);
|
||||
}
|
||||
int[] results = targetJdbcTemplate.batchUpdate(sql, batchArgs);
|
||||
return Arrays.stream(results).sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* 手动导入指定的远程文件(单个文件处理)
|
||||
*/
|
||||
public void importRemoteFile(String remoteFileName) throws Exception {
|
||||
String remoteFilePath = remotePath + remoteFileName;
|
||||
String localFilePath = localPath + remoteFileName;
|
||||
|
||||
log.info("手动导入远程文件: {}", remoteFilePath);
|
||||
|
||||
// 下载并删除远程文件
|
||||
List<String> singleFileList = Collections.singletonList(remoteFilePath);
|
||||
List<DownloadResult> results = SftpUploadUtil.batchDownloadAndDelete(
|
||||
ip, port, username, password,
|
||||
singleFileList, localPath, true
|
||||
);
|
||||
|
||||
if (results.isEmpty() || !results.get(0).isSuccess()) {
|
||||
throw new Exception("下载失败: " + remoteFilePath);
|
||||
}
|
||||
|
||||
DownloadResult result = results.get(0);
|
||||
String tableName = extractTableNameFromFile(remoteFileName);
|
||||
|
||||
try {
|
||||
|
||||
// 缓存表配置
|
||||
Map<String, String> tableConfigCache = syncData();
|
||||
log.info("加载表配置缓存完成,共 {} 张表", tableConfigCache.size());
|
||||
|
||||
// 导入数据
|
||||
importJsonToTable(tableName, result.getLocalFilePath(),tableConfigCache);
|
||||
|
||||
// 导入成功后删除本地文件
|
||||
deleteLocalFile(result.getLocalFilePath());
|
||||
|
||||
log.info("手动导入完成: {}", remoteFileName);
|
||||
} catch (Exception e) {
|
||||
log.error("手动导入失败: {}", remoteFileName, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取远程目录下所有待处理文件列表
|
||||
*/
|
||||
public List<String> getPendingFiles() {
|
||||
List<String> matchedFiles = new ArrayList<>();
|
||||
Vector<?> files;
|
||||
try {
|
||||
// 列目录也纳入异常捕获
|
||||
files = SftpUploadUtil.listFiles(ip, port, username, password, remotePath);
|
||||
for (Object obj : files) {
|
||||
if (obj instanceof com.jcraft.jsch.ChannelSftp.LsEntry) {
|
||||
com.jcraft.jsch.ChannelSftp.LsEntry entry = (com.jcraft.jsch.ChannelSftp.LsEntry) obj;
|
||||
String fileName = entry.getFilename();
|
||||
matchedFiles.add(fileName);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("获取待处理文件列表失败,远端目录:{}", remotePath, e);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return matchedFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理本地已处理过的文件
|
||||
*/
|
||||
public void cleanLocalFiles(int daysToKeep) {
|
||||
File localDir = new File(localPath);
|
||||
if (!localDir.exists()) {
|
||||
return;
|
||||
}
|
||||
|
||||
long cutoffTime = System.currentTimeMillis() - (daysToKeep * 24L * 60 * 60 * 1000);
|
||||
File[] files = localDir.listFiles();
|
||||
|
||||
if (files != null) {
|
||||
int deletedCount = 0;
|
||||
for (File file : files) {
|
||||
if (file.isFile() && file.lastModified() < cutoffTime) {
|
||||
file.delete();
|
||||
deletedCount++;
|
||||
}
|
||||
}
|
||||
log.info("清理了服务器 {} 七天前本地txt文件", deletedCount);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 测试方法
|
||||
*/
|
||||
public void testR() {
|
||||
try {
|
||||
String remoteFilePath = remotePath + "test.txt";
|
||||
String localFilePath = localPath + "test.txt";
|
||||
|
||||
SftpUploadUtil.downloadFile(ip, port, username, password, remoteFilePath, localFilePath,true);
|
||||
|
||||
File file = new File(localFilePath);
|
||||
if (file.exists()) {
|
||||
String content = new String(Files.readAllBytes(Paths.get(localFilePath)));
|
||||
log.info("文件内容: {}", content);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("获取文件失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
//package com.njcn.relational.utils;
|
||||
//
|
||||
//import java.sql.Blob;
|
||||
//import java.sql.SQLException;
|
||||
//import java.util.Base64;
|
||||
//
|
||||
//public class BlobConverter {
|
||||
//
|
||||
// /**
|
||||
// * 将 Blob 转换为 Base64 字符串(推荐)
|
||||
// */
|
||||
// public static String blobToBase64(Blob blob) {
|
||||
// if (blob == null) {
|
||||
// return null;
|
||||
// }
|
||||
// try {
|
||||
// byte[] bytes = blob.getBytes(1, (int) blob.length());
|
||||
// return Base64.getEncoder().encodeToString(bytes);
|
||||
// } catch (SQLException e) {
|
||||
// throw new RuntimeException("Blob 转换失败", e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 将 Blob 转换为字节数组
|
||||
// */
|
||||
// public static byte[] blobToBytes(Blob blob) {
|
||||
// if (blob == null) {
|
||||
// return null;
|
||||
// }
|
||||
// try {
|
||||
// return blob.getBytes(1, (int) blob.length());
|
||||
// } catch (SQLException e) {
|
||||
// throw new RuntimeException("Blob 转换失败", e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 将 Blob 转换为十六进制字符串(用于小文件)
|
||||
// */
|
||||
// public static String blobToHex(Blob blob) {
|
||||
// if (blob == null) {
|
||||
// return null;
|
||||
// }
|
||||
// try {
|
||||
// byte[] bytes = blob.getBytes(1, (int) blob.length());
|
||||
// StringBuilder hexString = new StringBuilder();
|
||||
// for (byte b : bytes) {
|
||||
// String hex = Integer.toHexString(0xff & b);
|
||||
// if (hex.length() == 1) {
|
||||
// hexString.append('0');
|
||||
// }
|
||||
// hexString.append(hex);
|
||||
// }
|
||||
// return hexString.toString();
|
||||
// } catch (SQLException e) {
|
||||
// throw new RuntimeException("Blob 转换失败", e);
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
@@ -0,0 +1,533 @@
|
||||
package com.njcn.relational.utils;
|
||||
|
||||
/**
|
||||
* @Author: cdf
|
||||
* @CreateTime: 2026-05-26
|
||||
* @Description: SFTP 文件传输工具类(支持上传和下载)
|
||||
*/
|
||||
|
||||
import com.jcraft.jsch.*;
|
||||
import com.njcn.relational.pojo.bo.DownloadResult;
|
||||
import com.njcn.relational.pojo.bo.UploadResult;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* SFTP 文件传输工具类
|
||||
* 功能:将当前服务器上的文件传输到另一台服务器的指定目录,或从远程服务器下载文件
|
||||
*/
|
||||
@Slf4j
|
||||
public class SftpUploadUtil {
|
||||
|
||||
// 默认端口
|
||||
private static final int DEFAULT_PORT = 22;
|
||||
|
||||
/**
|
||||
* 上传文件到远程服务器
|
||||
*
|
||||
* @param host 远程服务器IP
|
||||
* @param port 远程服务器端口(默认22)
|
||||
* @param username 服务器用户名
|
||||
* @param password 服务器密码
|
||||
* @param localFilePath 本地/当前服务器文件路径(如 /data/test.txt)
|
||||
* @param remoteDir 远程目标目录(如 /opt/upload/)
|
||||
* @throws Exception 上传异常
|
||||
*/
|
||||
public static void uploadFile(String host, int port, String username, String password,
|
||||
String localFilePath, String remoteDir) throws Exception {
|
||||
|
||||
JSch jsch = new JSch();
|
||||
Session session = null;
|
||||
ChannelSftp channelSftp = null;
|
||||
InputStream inputStream = null;
|
||||
|
||||
try {
|
||||
// 1. 创建SSH会话
|
||||
session = jsch.getSession(username, host, port);
|
||||
session.setPassword(password);
|
||||
|
||||
// 2. 设置配置(跳过主机密钥检查,生产环境可根据安全要求调整)
|
||||
Properties config = new Properties();
|
||||
config.put("StrictHostKeyChecking", "no");
|
||||
config.put("server_host_key", "ssh-rsa,ssh-dss");
|
||||
session.setConfig(config);
|
||||
|
||||
// 3. 连接服务器
|
||||
log.info("正在连接远程服务器:" + host);
|
||||
session.connect();
|
||||
log.info("服务器连接成功!");
|
||||
|
||||
// 4. 打开SFTP通道
|
||||
Channel channel = session.openChannel("sftp");
|
||||
channel.connect();
|
||||
channelSftp = (ChannelSftp) channel;
|
||||
|
||||
// 5. 如果远程目录不存在,自动创建(支持多级目录)
|
||||
try {
|
||||
channelSftp.cd(remoteDir);
|
||||
} catch (SftpException e) {
|
||||
// 目录不存在则创建
|
||||
String[] dirs = remoteDir.split("/");
|
||||
String currentPath = "";
|
||||
for (String dir : dirs) {
|
||||
if (dir == null || dir.isEmpty()) continue;
|
||||
currentPath += "/" + dir;
|
||||
try {
|
||||
channelSftp.cd(currentPath);
|
||||
} catch (SftpException e1) {
|
||||
channelSftp.mkdir(currentPath);
|
||||
channelSftp.cd(currentPath);
|
||||
}
|
||||
}
|
||||
log.info("远程目录已创建:" + remoteDir);
|
||||
}
|
||||
|
||||
// 6. 读取本地文件
|
||||
File localFile = new File(localFilePath);
|
||||
if (!localFile.exists()) {
|
||||
throw new FileNotFoundException("本地文件不存在: " + localFilePath);
|
||||
}
|
||||
inputStream = new FileInputStream(localFile);
|
||||
|
||||
// 7. 上传文件
|
||||
log.info("开始上传文件:" + localFile.getName());
|
||||
channelSftp.put(inputStream, localFile.getName());
|
||||
log.info("文件上传完成!远程路径:" + remoteDir + localFile.getName());
|
||||
|
||||
} finally {
|
||||
// 8. 关闭流和连接
|
||||
if (inputStream != null) inputStream.close();
|
||||
if (channelSftp != null && channelSftp.isConnected()) channelSftp.disconnect();
|
||||
if (session != null && session.isConnected()) session.disconnect();
|
||||
log.info("连接已关闭");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 批量上传文件到远程服务器(共享同一个Session连接)
|
||||
*
|
||||
* @param host 远程服务器IP
|
||||
* @param port 远程服务器端口(默认22)
|
||||
* @param username 服务器用户名
|
||||
* @param password 服务器密码
|
||||
* @param localFilePaths 本地文件路径列表
|
||||
* @param remoteDir 远程目标目录(如 /opt/upload/)
|
||||
* @return 上传结果列表,包含每个文件的上传状态
|
||||
* @throws Exception 连接异常
|
||||
*/
|
||||
/**
|
||||
* 批量上传文件到远程服务器(共享同一个Session连接)【稳定版:支持保活 + 重试】
|
||||
*/
|
||||
public static List<UploadResult> batchUploadFiles(String host, int port, String username, String password,
|
||||
List<String> localFilePaths, String remoteDir) throws Exception {
|
||||
|
||||
List<UploadResult> results = new ArrayList<>();
|
||||
JSch jsch = new JSch();
|
||||
Session session = null;
|
||||
ChannelSftp channelSftp = null;
|
||||
|
||||
try {
|
||||
session = jsch.getSession(username, host, port);
|
||||
session.setPassword(password);
|
||||
|
||||
Properties config = new Properties();
|
||||
config.put("StrictHostKeyChecking", "no");
|
||||
config.put("server_host_key", "ssh-rsa,ssh-dss");
|
||||
|
||||
session.setConfig(config);
|
||||
|
||||
session.setTimeout(30000);
|
||||
// ✅ 保活间隔:每 10 秒发一次心跳
|
||||
session.setServerAliveInterval(10000);
|
||||
session.setServerAliveCountMax(5);
|
||||
|
||||
log.info("正在连接远程服务器:" + host + ":" + port);
|
||||
session.connect();
|
||||
log.info("服务器连接成功!");
|
||||
|
||||
Channel channel = session.openChannel("sftp");
|
||||
channel.connect();
|
||||
channelSftp = (ChannelSftp) channel;
|
||||
channelSftp.setFilenameEncoding("UTF-8");
|
||||
|
||||
|
||||
log.info("开始批量上传文件,共 " + localFilePaths.size() + " 个文件, 上传目录: " + remoteDir);
|
||||
|
||||
for (String localFilePath : localFilePaths) {
|
||||
UploadResult result = new UploadResult();
|
||||
result.setLocalFilePath(localFilePath);
|
||||
|
||||
// ✅ 失败自动重试 3 次
|
||||
int retry = 3;
|
||||
boolean uploadOk = false;
|
||||
|
||||
while (retry > 0 && !uploadOk) {
|
||||
try (InputStream inputStream = new FileInputStream(localFilePath)) {
|
||||
File localFile = new File(localFilePath);
|
||||
if (!localFile.exists()) {
|
||||
result.setSuccess(false);
|
||||
result.setErrorMessage("文件不存在");
|
||||
break;
|
||||
}
|
||||
|
||||
String fileName = localFile.getName();
|
||||
// ✅ 拼接完整绝对路径
|
||||
String remoteFullPath = remoteDir + (remoteDir.endsWith("/") ? "" : "/") + fileName;
|
||||
|
||||
// ✅ 直接用完整路径上传,不依赖 cd 目录
|
||||
channelSftp.put(inputStream, remoteFullPath);
|
||||
// ✅ 用完整路径校验,绝对不会错
|
||||
channelSftp.stat(remoteFullPath);
|
||||
|
||||
uploadOk = true;
|
||||
|
||||
result.setSuccess(true);
|
||||
result.setRemotePath(remoteFullPath);
|
||||
result.setFileSize(localFile.length());
|
||||
log.info("上传成功: " + fileName + " 剩余重试:" + retry);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.info(e.getMessage());
|
||||
retry--;
|
||||
log.error("上传失败,剩余重试次数 " + retry + ":" + localFilePath);
|
||||
if (retry == 0) {
|
||||
result.setSuccess(false);
|
||||
result.setErrorMessage("最终失败:" + e.getMessage());
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
results.add(result);
|
||||
}
|
||||
|
||||
long successCount = results.stream().filter(UploadResult::isSuccess).count();
|
||||
long failCount = results.size() - successCount;
|
||||
log.info("批量上传完成!成功: " + successCount + ", 失败: " + failCount);
|
||||
|
||||
} finally {
|
||||
safeDisconnect(session,channelSftp);
|
||||
log.info("连接已关闭");
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全的关闭方法
|
||||
*/
|
||||
private static void safeDisconnect(Session session, ChannelSftp channelSftp) {
|
||||
if (channelSftp != null) {
|
||||
try {
|
||||
if (channelSftp.isConnected()) {
|
||||
channelSftp.exit(); // 退出 SFTP 通道
|
||||
channelSftp.disconnect();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("关闭 SFTP 通道失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (session != null) {
|
||||
try {
|
||||
if (session.isConnected()) {
|
||||
session.disconnect();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("关闭 Session 失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 从远程服务器下载文件
|
||||
*
|
||||
* @param host 远程服务器IP
|
||||
* @param port 远程服务器端口(默认22)
|
||||
* @param username 服务器用户名
|
||||
* @param password 服务器密码
|
||||
* @param remoteFilePath 远程文件路径(如 /opt/data/test.txt)
|
||||
* @param localDir 本地目标目录(如 /data/download/)
|
||||
* @return 下载后的本地文件路径
|
||||
* @throws Exception 下载异常
|
||||
*/
|
||||
public static String downloadFile(String host, int port, String username, String password,
|
||||
String remoteFilePath, String localDir,
|
||||
boolean deleteAfterDownload) throws Exception {
|
||||
|
||||
JSch jsch = new JSch();
|
||||
Session session = null;
|
||||
ChannelSftp channelSftp = null;
|
||||
OutputStream outputStream = null;
|
||||
|
||||
try {
|
||||
// 1. 创建SSH会话
|
||||
session = jsch.getSession(username, host, port);
|
||||
session.setPassword(password);
|
||||
|
||||
// 2. 设置配置
|
||||
Properties config = new Properties();
|
||||
config.put("StrictHostKeyChecking", "no");
|
||||
config.put("server_host_key", "ssh-rsa,ssh-dss");
|
||||
session.setConfig(config);
|
||||
|
||||
// 3. 连接服务器
|
||||
log.info("正在连接远程服务器:" + host);
|
||||
session.connect();
|
||||
log.info("服务器连接成功!");
|
||||
|
||||
// 4. 打开SFTP通道
|
||||
Channel channel = session.openChannel("sftp");
|
||||
channel.connect();
|
||||
channelSftp = (ChannelSftp) channel;
|
||||
|
||||
// 5. 检查远程文件是否存在
|
||||
try {
|
||||
channelSftp.stat(remoteFilePath);
|
||||
} catch (SftpException e) {
|
||||
throw new FileNotFoundException("远程文件不存在: " + remoteFilePath);
|
||||
}
|
||||
|
||||
// 6. 创建本地目录(如果不存在)
|
||||
File localDirFile = new File(localDir);
|
||||
if (!localDirFile.exists()) {
|
||||
localDirFile.mkdirs();
|
||||
log.info("创建本地目录:" + localDir);
|
||||
}
|
||||
|
||||
// 7. 生成本地文件路径
|
||||
String fileName = remoteFilePath.substring(remoteFilePath.lastIndexOf("/") + 1);
|
||||
String localFilePath = localDir + fileName;
|
||||
outputStream = new FileOutputStream(new File(localFilePath));
|
||||
|
||||
// 8. 下载文件
|
||||
log.info("开始下载文件:" + fileName);
|
||||
channelSftp.get(remoteFilePath, outputStream);
|
||||
log.info("文件下载完成!本地路径:" + localFilePath);
|
||||
|
||||
|
||||
// 下载成功后删除远程文件
|
||||
if (deleteAfterDownload) {
|
||||
try {
|
||||
channelSftp.rm(remoteFilePath);
|
||||
log.info("已删除远程文件: " + remoteFilePath);
|
||||
} catch (SftpException e) {
|
||||
log.error("删除远程文件失败: " + remoteFilePath + ", " + e.getMessage());
|
||||
}
|
||||
}
|
||||
return localFilePath;
|
||||
|
||||
} finally {
|
||||
// 9. 关闭流和连接
|
||||
if (outputStream != null) outputStream.close();
|
||||
if (channelSftp != null && channelSftp.isConnected()) channelSftp.disconnect();
|
||||
if (session != null && session.isConnected()) session.disconnect();
|
||||
log.info("连接已关闭");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 批量下载并删除远程文件(下载成功后删除远程文件)
|
||||
*
|
||||
* @param host 远程服务器IP
|
||||
* @param port 远程服务器端口
|
||||
* @param username 用户名
|
||||
* @param password 密码
|
||||
* @param remoteFilePaths 远程文件路径列表
|
||||
* @param localDir 本地目标目录
|
||||
* @param deleteAfterDownload 下载后是否删除远程文件
|
||||
* @return 下载结果列表
|
||||
* @throws Exception 连接异常
|
||||
*/
|
||||
public static List<DownloadResult> batchDownloadAndDelete(String host, int port, String username, String password,
|
||||
List<String> remoteFilePaths, String localDir,
|
||||
boolean deleteAfterDownload) throws Exception {
|
||||
|
||||
List<DownloadResult> results = new ArrayList<>();
|
||||
JSch jsch = new JSch();
|
||||
Session session = null;
|
||||
ChannelSftp channelSftp = null;
|
||||
|
||||
try {
|
||||
// 创建并连接
|
||||
session = jsch.getSession(username, host, port);
|
||||
session.setPassword(password);
|
||||
Properties config = new Properties();
|
||||
config.put("StrictHostKeyChecking", "no");
|
||||
config.put("server_host_key", "ssh-rsa,ssh-dss");
|
||||
session.setConfig(config);
|
||||
session.setTimeout(30000);
|
||||
session.connect();
|
||||
log.info("服务器连接成功!");
|
||||
|
||||
Channel channel = session.openChannel("sftp");
|
||||
channel.connect();
|
||||
channelSftp = (ChannelSftp) channel;
|
||||
|
||||
// 创建本地目录
|
||||
File localDirFile = new File(localDir);
|
||||
if (!localDirFile.exists()) {
|
||||
localDirFile.mkdirs();
|
||||
log.info("创建本地目录:" + localDir);
|
||||
}
|
||||
|
||||
log.info("开始批量下载文件,共 " + remoteFilePaths.size() + " 个文件");
|
||||
|
||||
for (String remoteFilePath : remoteFilePaths) {
|
||||
DownloadResult result = new DownloadResult();
|
||||
result.setRemoteFilePath(remoteFilePath);
|
||||
|
||||
OutputStream outputStream = null;
|
||||
try {
|
||||
String fileName = remoteFilePath.substring(remoteFilePath.lastIndexOf("/") + 1);
|
||||
String localFilePath = localDir + (localDir.endsWith("/") ? "" : "/") + fileName;
|
||||
|
||||
outputStream = new FileOutputStream(new File(localFilePath));
|
||||
channelSftp.get(remoteFilePath, outputStream);
|
||||
|
||||
result.setSuccess(true);
|
||||
result.setLocalFilePath(localFilePath);
|
||||
log.info("下载成功: " + fileName);
|
||||
|
||||
// 下载成功后删除远程文件
|
||||
if (deleteAfterDownload) {
|
||||
try {
|
||||
channelSftp.rm(remoteFilePath);
|
||||
log.info("已删除远程文件: " + remoteFilePath);
|
||||
} catch (SftpException e) {
|
||||
log.error("删除远程文件失败: " + remoteFilePath + ", " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
result.setSuccess(false);
|
||||
result.setErrorMessage(e.getMessage());
|
||||
log.error("下载失败: " + remoteFilePath + ", 错误: " + e.getMessage());
|
||||
} finally {
|
||||
if (outputStream != null) {
|
||||
try {
|
||||
outputStream.close();
|
||||
} catch (IOException e) {
|
||||
// 忽略
|
||||
}
|
||||
}
|
||||
}
|
||||
results.add(result);
|
||||
}
|
||||
|
||||
long successCount = results.stream().filter(DownloadResult::isSuccess).count();
|
||||
long failCount = results.size() - successCount;
|
||||
log.info("批量下载完成!成功: " + successCount + ", 失败: " + failCount);
|
||||
|
||||
} finally {
|
||||
if (channelSftp != null && channelSftp.isConnected()) {
|
||||
channelSftp.disconnect();
|
||||
}
|
||||
if (session != null && session.isConnected()) {
|
||||
session.disconnect();
|
||||
}
|
||||
log.info("连接已关闭");
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 列出远程目录下的文件
|
||||
*
|
||||
* @param host 远程服务器IP
|
||||
* @param port 远程服务器端口
|
||||
* @param username 用户名
|
||||
* @param password 密码
|
||||
* @param remoteDir 远程目录
|
||||
* @return 文件列表
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public static java.util.Vector<ChannelSftp.LsEntry> listFiles(String host, int port,
|
||||
String username, String password,
|
||||
String remoteDir) throws Exception {
|
||||
JSch jsch = new JSch();
|
||||
Session session = null;
|
||||
ChannelSftp channelSftp = null;
|
||||
|
||||
try {
|
||||
session = jsch.getSession(username, host, port);
|
||||
session.setPassword(password);
|
||||
Properties config = new Properties();
|
||||
config.put("StrictHostKeyChecking", "no");
|
||||
config.put("server_host_key", "ssh-rsa,ssh-dss");
|
||||
|
||||
session.setConfig(config);
|
||||
session.connect();
|
||||
|
||||
Channel channel = session.openChannel("sftp");
|
||||
channel.connect();
|
||||
channelSftp = (ChannelSftp) channel;
|
||||
|
||||
return channelSftp.ls(remoteDir);
|
||||
|
||||
}catch (Exception e){
|
||||
log.error("查看远程目录失败{}",remoteDir,e);
|
||||
throw new RuntimeException(e);
|
||||
}finally {
|
||||
if (channelSftp != null && channelSftp.isConnected()) channelSftp.disconnect();
|
||||
if (session != null && session.isConnected()) session.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除远程文件
|
||||
*
|
||||
* @param host 远程服务器IP
|
||||
* @param port 远程服务器端口
|
||||
* @param username 用户名
|
||||
* @param password 密码
|
||||
* @param remoteFilePath 远程文件路径
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
public static void deleteRemoteFile(String host, int port, String username, String password,
|
||||
String remoteFilePath) throws Exception {
|
||||
JSch jsch = new JSch();
|
||||
Session session = null;
|
||||
ChannelSftp channelSftp = null;
|
||||
|
||||
try {
|
||||
session = jsch.getSession(username, host, port);
|
||||
session.setPassword(password);
|
||||
Properties config = new Properties();
|
||||
config.put("StrictHostKeyChecking", "no");
|
||||
config.put("server_host_key", "ssh-rsa,ssh-dss");
|
||||
|
||||
session.setConfig(config);
|
||||
session.connect();
|
||||
|
||||
Channel channel = session.openChannel("sftp");
|
||||
channel.connect();
|
||||
channelSftp = (ChannelSftp) channel;
|
||||
|
||||
channelSftp.rm(remoteFilePath);
|
||||
log.info("远程文件已删除:" + remoteFilePath);
|
||||
|
||||
} finally {
|
||||
if (channelSftp != null && channelSftp.isConnected()) channelSftp.disconnect();
|
||||
if (session != null && session.isConnected()) session.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 上传文件(使用默认端口22)
|
||||
*/
|
||||
public static void uploadFile(String host, String username, String password,
|
||||
String localFilePath, String remoteDir) throws Exception {
|
||||
uploadFile(host, DEFAULT_PORT, username, password, localFilePath, remoteDir);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
spring:
|
||||
#数据库内容配置
|
||||
datasource:
|
||||
druid:
|
||||
driver-class-name: dm.jdbc.driver.DmDriver
|
||||
url: jdbc:dm://192.168.1.21:5236/PQSINFO_LN?useUnicode=true&characterEncoding=utf-8
|
||||
username: PQSINFO_LN
|
||||
password: Pqsadmin123
|
||||
#初始化建立物理连接的个数、最小、最大连接数
|
||||
initial-size: 5
|
||||
min-idle: 5
|
||||
max-active: 50
|
||||
#获取连接最大等待时间,单位毫秒
|
||||
max-wait: 60000
|
||||
#链接保持空间而不被驱逐的最长时间,单位毫秒
|
||||
min-evictable-idle-time-millis: 300000
|
||||
validation-query: select 1
|
||||
test-while-idle: true
|
||||
test-on-borrow: false
|
||||
test-on-return: false
|
||||
pool-prepared-statements: true
|
||||
max-pool-prepared-statement-per-connection-size: 20
|
||||
|
||||
|
||||
|
||||
sync:
|
||||
ip: 192.168.1.68
|
||||
username: root
|
||||
password: dnzl@#001
|
||||
import:
|
||||
localPath: D:/data/import/
|
||||
remotePath: /home/export/
|
||||
export:
|
||||
localPath: D:/data/export/
|
||||
remotePath: /home/export/
|
||||
@@ -0,0 +1,36 @@
|
||||
spring:
|
||||
#数据库内容配置
|
||||
datasource:
|
||||
druid:
|
||||
driver-class-name: dm.jdbc.driver.DmDriver
|
||||
url: jdbc:dm://dky/PQSADMIN?useUnicode=true&characterEncoding=utf-8
|
||||
username: PQSADMIN
|
||||
password: Pqsadmin.123
|
||||
#初始化建立物理连接的个数、最小、最大连接数
|
||||
initial-size: 5
|
||||
min-idle: 5
|
||||
max-active: 50
|
||||
#获取连接最大等待时间,单位毫秒
|
||||
max-wait: 60000
|
||||
#链接保持空间而不被驱逐的最长时间,单位毫秒
|
||||
min-evictable-idle-time-millis: 300000
|
||||
validation-query: select 1 from dual
|
||||
test-while-idle: true
|
||||
test-on-borrow: false
|
||||
test-on-return: false
|
||||
pool-prepared-statements: true
|
||||
max-pool-prepared-statement-per-connection-size: 20
|
||||
|
||||
|
||||
|
||||
sync:
|
||||
ip: 10.21.39.5
|
||||
port: 22
|
||||
username: njcn
|
||||
password: "@#001njcnpqs"
|
||||
import:
|
||||
localPath: /home/dcloud/import/
|
||||
remotePath: /home/d5000/liaoning/file-recv/7/isolate_recv/
|
||||
export:
|
||||
localPath: D:/data/export/
|
||||
remotePath: /home/export/
|
||||
@@ -0,0 +1,35 @@
|
||||
spring:
|
||||
#数据库内容配置
|
||||
datasource:
|
||||
druid:
|
||||
driver-class-name: dm.jdbc.driver.DmDriver
|
||||
url: jdbc:dm://192.168.1.21:5236/PQSADMIN?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
|
||||
username: PQSADMIN
|
||||
password: Pqsadmin123
|
||||
#初始化建立物理连接的个数、最小、最大连接数
|
||||
initial-size: 5
|
||||
min-idle: 5
|
||||
max-active: 50
|
||||
#获取连接最大等待时间,单位毫秒
|
||||
max-wait: 60000
|
||||
#链接保持空间而不被驱逐的最长时间,单位毫秒
|
||||
min-evictable-idle-time-millis: 300000
|
||||
validation-query: select 1
|
||||
test-while-idle: true
|
||||
test-on-borrow: false
|
||||
test-on-return: false
|
||||
pool-prepared-statements: true
|
||||
max-pool-prepared-statement-per-connection-size: 20
|
||||
|
||||
|
||||
|
||||
sync:
|
||||
ip: 192.168.1.68
|
||||
username: root
|
||||
password: dnzl@#001
|
||||
export:
|
||||
localPath: D:/data/export/
|
||||
remotePath: /home/export/
|
||||
import:
|
||||
localPath: D:/data/export/
|
||||
remotePath: /home/export/
|
||||
@@ -0,0 +1,36 @@
|
||||
spring:
|
||||
#数据库内容配置
|
||||
datasource:
|
||||
druid:
|
||||
driver-class-name: dm.jdbc.driver.DmDriver
|
||||
url: jdbc:dm://10.11.7.6:15236/PQSADMIN?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT
|
||||
username: PQSADMIN
|
||||
password: Pqsadmin123
|
||||
#初始化建立物理连接的个数、最小、最大连接数
|
||||
initial-size: 5
|
||||
min-idle: 5
|
||||
max-active: 50
|
||||
#获取连接最大等待时间,单位毫秒
|
||||
max-wait: 60000
|
||||
#链接保持空间而不被驱逐的最长时间,单位毫秒
|
||||
min-evictable-idle-time-millis: 300000
|
||||
validation-query: select 1
|
||||
test-while-idle: true
|
||||
test-on-borrow: false
|
||||
test-on-return: false
|
||||
pool-prepared-statements: true
|
||||
max-pool-prepared-statement-per-connection-size: 20
|
||||
|
||||
|
||||
|
||||
sync:
|
||||
ip: 10.11.8.22
|
||||
port: 22
|
||||
username: njcn
|
||||
password: "@#001njcnpqs"
|
||||
export:
|
||||
localPath: /home/lnyw/export/
|
||||
remotePath: /home/d5000/liaoning/file-send/7/
|
||||
import:
|
||||
localPath: D:/data/export/
|
||||
remotePath: /home/import/
|
||||
@@ -0,0 +1,81 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
|
||||
<!-- 应用名称 -->
|
||||
<springProperty scope="context" name="appName" source="spring.application.name" defaultValue="app"/>
|
||||
|
||||
<!-- ========================== 控制台输出 ========================== -->
|
||||
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- ========================== 全部日志 ========================== -->
|
||||
<appender name="FILE_ALL" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>/home/lnyw/logs/transport/transport${appName}.log</file>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>/home/lnyw/logs/transport/transport${appName}-%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<maxHistory>15</maxHistory>
|
||||
<totalSizeCap>10GB</totalSizeCap>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- ========================== INFO 日志(修复版) ========================== -->
|
||||
<appender name="FILE_INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>/home/lnyw/logs/transport/transport${appName}-info.log</file>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>/home/lnyw/logs/transport/transport${appName}-info-%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<maxHistory>15</maxHistory>
|
||||
<totalSizeCap>5GB</totalSizeCap>
|
||||
</rollingPolicy>
|
||||
<!-- 过滤 INFO -->
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<level>INFO</level>
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- ========================== ERROR 日志 ========================== -->
|
||||
<appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>/home/lnyw/logs/transport/transport${appName}-error.log</file>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>/home/lnyw/logs/transport/transport${appName}-error-%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<maxHistory>30</maxHistory>
|
||||
<totalSizeCap>5GB</totalSizeCap>
|
||||
</rollingPolicy>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<level>ERROR</level>
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- ========================== 根日志 ========================== -->
|
||||
<root level="info">
|
||||
<appender-ref ref="CONSOLE"/>
|
||||
<appender-ref ref="FILE_ALL"/>
|
||||
<appender-ref ref="FILE_INFO"/>
|
||||
<appender-ref ref="FILE_ERROR"/>
|
||||
</root>
|
||||
|
||||
<!-- 日志级别 -->
|
||||
<logger name="com" level="info"/>
|
||||
<logger name="org.springframework" level="info"/>
|
||||
<logger name="org.mybatis" level="error"/>
|
||||
|
||||
</configuration>
|
||||
3
relational_migration/relational_target/src/main/resources/static/js/axios.min.js
vendored
Normal file
3
relational_migration/relational_target/src/main/resources/static/js/axios.min.js
vendored
Normal file
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
6
relational_migration/relational_target/src/main/resources/static/js/vue.min.js
vendored
Normal file
6
relational_migration/relational_target/src/main/resources/static/js/vue.min.js
vendored
Normal file
File diff suppressed because one or more lines are too long
@@ -0,0 +1,28 @@
|
||||
<!DOCTYPE html>
|
||||
<html xmlns:th="http://www.w3.org/1999/xhtml">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>登录</title>
|
||||
<style>
|
||||
body { text-align:center; margin-top:50px; }
|
||||
.box { display:inline-block; padding:20px; border:1px solid #ccc; border-radius:8px; }
|
||||
.err { color:red; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="box">
|
||||
<h2>系统登录</h2>
|
||||
<!-- 提交到 /doLogin,Spring Security 自动处理 -->
|
||||
<form action="/login" method="post">
|
||||
<div>
|
||||
<input type="text" name="username" value="data_njcn" placeholder="用户名" required><br><br>
|
||||
</div>
|
||||
<div>
|
||||
<input type="password" value="dnzl@#002" name="password" placeholder="密码" required><br><br>
|
||||
</div>
|
||||
<button type="submit">登录</button>
|
||||
</form>
|
||||
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
Reference in New Issue
Block a user