文件夹层级
AutoSynchronizeFileCopy.py
config.json
AutoSynchronizeFileCopy.py :
import os
import re
import json
import time
import shutil
import logging
import datetime
from typing import Dict, Any, List, Tuple, Optional
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("sync_log.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger()
class SharedFolderSyncer:
def __init__(self, config_path: str):
self.config = self.load_config(config_path)
self.state_file = "sync_state.json"
self.sync_state = self.load_sync_state()
# 创建保存根目录
os.makedirs(self.config['geely_save_path'], exist_ok=True)
logger.info(f"保存根目录已创建: {self.config['geely_save_path']}")
# 记录共享路径连接状态
self.connection_status = {}
# 标记是否已完成首次同步
self.first_run_completed = self.sync_state.get('first_run_completed', False)
logger.info(f"首次同步状态: {'已完成' if self.first_run_completed else '未完成'}")
# 解析首次同步时间
self.initial_sync_time = self.parse_initial_sync_time()
logger.info(f"首次同步基准时间: {datetime.datetime.fromtimestamp(self.initial_sync_time)}")
def parse_initial_sync_time(self) -> float:
"""解析首次同步时间字符串为时间戳"""
try:
sync_time_str = self.config['sync_time']
sync_dt = datetime.datetime.strptime(sync_time_str, "%Y-%m-%d %H:%M")
return sync_dt.timestamp()
except Exception as e:
logger.error(f"解析首次同步时间失败: {e}, 将使用当前时间")
return time.time()
def load_config(self, path: str) -> Dict[str, Any]:
"""加载配置文件"""
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载配置文件失败: {e}")
raise
def load_sync_state(self) -> Dict[str, Any]:
"""加载同步状态文件(记录最后同步时间和首次运行状态)"""
state = {
'first_run_completed': False,
'last_sync_times': {k: 0.0 for k in self.config['ft_share_paths'].keys()}
}
if os.path.exists(self.state_file):
try:
with open(self.state_file, 'r') as f:
saved_state = json.load(f)
# 兼容旧版本和新版本状态文件
if 'first_run_completed' in saved_state:
state['first_run_completed'] = saved_state['first_run_completed']
if 'last_sync_times' in saved_state:
state['last_sync_times'] = saved_state['last_sync_times']
else:
# 旧版本状态文件只有同步时间
for k in self.config['ft_share_paths'].keys():
state['last_sync_times'][k] = saved_state.get(k, 0.0)
except Exception as e:
logger.error(f"加载状态文件失败: {e}")
# 状态文件损坏时创建新文件
self.save_sync_state(state)
return state
def save_sync_state(self):
"""保存同步状态"""
try:
with open(self.state_file, 'w') as f:
json.dump(self.sync_state, f, indent=2)
logger.info("同步状态已保存")
logger.info(f"当前同步状态: {json.dumps(self.sync_state, indent=2)}")
except Exception as e:
logger.error(f"保存状态文件失败: {e}")
def check_share_connection(self, share_path: str) -> bool:
"""检查共享路径是否可访问"""
try:
# 尝试访问共享路径
if os.path.exists(share_path):
self.connection_status[share_path] = True
logger.info(f"成功连接到共享路径: {share_path}")
return True
else:
self.connection_status[share_path] = False
logger.warning(f"无法连接到共享路径: {share_path}")
return False
except Exception as e:
self.connection_status[share_path] = False
logger.error(f"检查共享路径连接失败: {share_path}, 错误: {str(e)}")
return False
def extract_datetime_parts(self, folder_name: str) -> Optional[Tuple[str, str]]:
"""
从文件夹名称中提取日期和时间部分
例如: "163878_20250809_145458" -> ("20250809", "145458")
"""
# 匹配下划线分隔的8位日期和6位时间
match = re.search(r'(d{8})_(d{6})', folder_name)
if match:
return match.group(1), match.group(2)
# 尝试其他可能的格式
match = re.search(r'(d{8})-(d{6})', folder_name)
if match:
return match.group(1), match.group(2)
# 尝试纯日期时间格式
match = re.search(r'(d{8})_?(d{6})', folder_name)
if match:
return match.group(1), match.group(2)
logger.warning(f"无法从文件夹名称提取日期时间部分: {folder_name}")
return None
def copy_large_file(self, src: str, dst: str, buffer_size=1024 * 1024 * 100) -> bool:
"""分块复制大文件避免内存溢出"""
try:
# 确保目标目录存在
os.makedirs(os.path.dirname(dst), exist_ok=True)
# 复制文件内容
with open(src, 'rb') as f_src:
with open(dst, 'wb') as f_dst:
while True:
chunk = f_src.read(buffer_size)
if not chunk:
break
f_dst.write(chunk)
# 保留原始修改时间
shutil.copystat(src, dst)
logger.info(f"成功复制文件: {src} -> {dst}")
return True
except Exception as e:
logger.error(f"复制文件失败 {src} -> {dst}: {str(e)}")
# 清理部分复制的文件
if os.path.exists(dst):
try:
os.remove(dst)
except:
pass
return False
def sync_folder(self, share_key: str, folder_info: Dict[str, str]):
"""同步单个共享文件夹及其所有内容(基于文件夹修改时间)"""
share_path = folder_info['route']
module = folder_info['module']
car_number = folder_info['car_number']
# 在开始同步前记录时间戳
sync_start_time = time.time()
logger.info(f"共享文件夹 {share_key} 同步开始时间: {datetime.datetime.fromtimestamp(sync_start_time)}")
# 确定上次同步时间
if not self.first_run_completed:
# 首次同步使用config.json中的sync_time
last_sync = self.initial_sync_time
logger.info(f"首次同步使用配置的初始同步时间: {datetime.datetime.fromtimestamp(last_sync)}")
else:
# 后续同步使用sync_state.json中的last_sync_times
last_sync = self.sync_state['last_sync_times'].get(share_key, 0.0)
logger.info(f"上次同步时间: {datetime.datetime.fromtimestamp(last_sync)}")
# 检查共享路径连接
if not self.check_share_connection(share_path):
logger.error(f"无法访问共享路径,跳过同步: {share_path}")
# 即使无法连接,如果是首次同步,也要设置last_sync_times为initial_sync_time
if not self.first_run_completed:
self.sync_state['last_sync_times'][share_key] = self.initial_sync_time
logger.info(f"首次同步无法访问共享路径 {share_key},已设置其同步时间为配置的初始时间")
return
# 创建基础目标目录
base_target_dir = os.path.join(
self.config['geely_save_path'],
module,
car_number
)
os.makedirs(base_target_dir, exist_ok=True)
logger.info(f"开始同步: {share_key}")
logger.info(f"源路径: {share_path}")
logger.info(f"目标基础路径: {base_target_dir}")
# 记录新增内容
new_folders = []
new_files = []
# 标记是否有找到需要同步的文件夹
found_folder_to_sync = False
# 遍历共享文件夹下的所有日期文件夹
for date_folder in os.listdir(share_path):
date_folder_path = os.path.join(share_path, date_folder)
# 跳过文件,只处理文件夹
if not os.path.isdir(date_folder_path):
continue
try:
# 获取文件夹修改时间
folder_mtime = os.path.getmtime(date_folder_path)
folder_mtime_dt = datetime.datetime.fromtimestamp(folder_mtime)
# 检查文件夹是否更新
if folder_mtime > last_sync:
found_folder_to_sync = True
# 提取日期和时间部分
datetime_parts = self.extract_datetime_parts(date_folder)
if datetime_parts:
date_str, time_str = datetime_parts
logger.info(f"处理日期文件夹: {date_folder} -> 日期: {date_str}, 时间: {time_str}")
# 创建目录层级: 日期/ROS&BLF/原始名称
date_dir = os.path.join(base_target_dir, date_str)
rosblf_dir = os.path.join(date_dir, "ROS&BLF")
# 使用原始名称作为目标文件夹名
target_dir = os.path.join(rosblf_dir, date_folder)
os.makedirs(target_dir, exist_ok=True)
new_folders.append(f"{date_str}/ROS&BLF/{date_folder}")
# 记录此日期文件夹是否有新增内容
date_folder_has_new = False
# 递归同步日期文件夹下的所有内容
for root, dirs, files in os.walk(date_folder_path):
# 处理当前目录下的所有文件
for file in files:
src_file = os.path.join(root, file)
try:
# 计算目标路径
rel_path = os.path.relpath(root, date_folder_path)
dest_path = os.path.join(target_dir, rel_path, file)
# 复制文件
if self.copy_large_file(src_file, dest_path):
new_files.append(dest_path)
date_folder_has_new = True
except Exception as e:
logger.error(f"处理文件失败 {src_file}: {str(e)}")
else:
# 无法提取日期时间部分时使用原始名称作为目录
logger.warning(f"无法提取日期时间部分,使用原始名称: {date_folder}")
# 创建目录层级: 原始名称/ROS&BLF/原始名称
folder_dir = os.path.join(base_target_dir, date_folder)
rosblf_dir = os.path.join(folder_dir, "ROS&BLF")
target_dir = os.path.join(rosblf_dir, date_folder)
os.makedirs(target_dir, exist_ok=True)
new_folders.append(f"{date_folder}/ROS&BLF/{date_folder}")
# 递归同步文件夹内容
for root, dirs, files in os.walk(date_folder_path):
for file in files:
src_file = os.path.join(root, file)
try:
rel_path = os.path.relpath(root, date_folder_path)
dest_path = os.path.join(target_dir, rel_path, file)
if self.copy_large_file(src_file, dest_path):
new_files.append(dest_path)
except Exception as e:
logger.error(f"处理文件失败 {src_file}: {str(e)}")
else:
logger.info(f"文件夹未更新,跳过: {date_folder} (修改时间: {folder_mtime_dt})")
except Exception as e:
logger.error(f"处理日期文件夹 {date_folder} 失败: {str(e)}")
continue
# 处理同步时间戳的逻辑
if found_folder_to_sync:
# 如果有需要同步的文件夹,更新为本次同步的开始时间
self.sync_state['last_sync_times'][share_key] = sync_start_time
logger.info(f"有新增数据,更新 {share_key} 的同步时间为: {datetime.datetime.fromtimestamp(sync_start_time)}")
else:
if not self.first_run_completed:
# 首次同步但没有需要同步的文件夹,设置为配置的初始时间
self.sync_state['last_sync_times'][share_key] = self.initial_sync_time
logger.info(
f"首次同步无新增数据,设置 {share_key} 的同步时间为配置的初始时间: {datetime.datetime.fromtimestamp(self.initial_sync_time)}")
else:
# 非首次同步但没有需要同步的文件夹,保持上次同步时间不变
logger.info(f"无新增数据,保持 {share_key} 的上次同步时间不变")
# 记录同步结束时间用于日志
sync_end_time = time.time()
sync_duration = sync_end_time - sync_start_time
logger.info(f"同步完成: {share_key}, 新增文件夹: {len(new_folders)}, 新增文件: {len(new_files)}")
logger.info(f"同步开始时间: {datetime.datetime.fromtimestamp(sync_start_time)}")
logger.info(f"同步结束时间: {datetime.datetime.fromtimestamp(sync_end_time)}")
logger.info(f"同步持续时间: {sync_duration:.2f}秒")
if new_folders:
logger.info(f"新增文件夹列表: {new_folders}")
if new_files:
logger.info(f"新增文件列表: {new_files[:5]}") # 只显示前5个文件避免日志过长
def run_sync(self):
"""执行所有文件夹同步"""
logger.info("=" * 60)
logger.info(f"开始同步任务 - {datetime.datetime.now()}")
# 记录是否已完成首次同步
original_first_run_completed = self.first_run_completed
for share_key, folder_info in self.config['ft_share_paths'].items():
try:
logger.info("-" * 50)
logger.info(f"处理共享文件夹: {share_key}")
self.sync_folder(share_key, folder_info)
except Exception as e:
logger.error(f"同步 {share_key} 失败: {str(e)}")
# 如果这是首次同步(first_run_completed为false),则在一轮同步完成后更新状态
if not original_first_run_completed:
self.sync_state['first_run_completed'] = True
self.first_run_completed = True
logger.info(f"首次同步已完成,后续同步将基于sync_state.json中的last_sync_times")
# 保存同步状态
self.save_sync_state()
logger.info(f"同步任务完成 - {datetime.datetime.now()}")
def calculate_next_run(self):
"""计算下次运行时间"""
now = datetime.datetime.now()
if not self.first_run_completed:
# 首次运行 - 使用配置的sync_time
try:
sync_time_str = self.config['sync_time']
target_time = datetime.datetime.strptime(sync_time_str, "%Y-%m-%d %H:%M")
logger.info(f"首次运行计划时间: {target_time}")
# 如果当前时间已过设定时间,则立即执行
if now > target_time:
logger.info("首次运行时间已过,将立即执行")
return now
return target_time
except Exception as e:
logger.error(f"解析首次运行时间失败: {e}, 将立即执行")
return now
else:
# 后续运行 - 使用interval间隔
interval_minutes = self.config['interval']
logger.info(f"后续运行将使用间隔时间: {interval_minutes} 分钟")
return now + datetime.timedelta(minutes=interval_minutes)
def schedule_sync(self):
"""定时任务调度"""
while True:
# 计算下次运行时间
next_run_time = self.calculate_next_run()
now = datetime.datetime.now()
wait_seconds = (next_run_time - now).total_seconds()
# 确保等待时间为正数
if wait_seconds < 0:
wait_seconds = 0
logger.info(f"下次同步时间: {next_run_time}, 等待 {wait_seconds / 60:.1f} 分钟")
# 等待到同步时间
time.sleep(wait_seconds)
# 执行同步
self.run_sync()
def run(self):
"""启动同步服务"""
try:
logger.info("=" * 60)
logger.info("文件夹同步服务启动")
logger.info(f"当前同步状态: {json.dumps(self.sync_state, indent=2)}")
self.schedule_sync()
except KeyboardInterrupt:
logger.info("用户中断服务")
except Exception as e:
logger.exception(f"服务运行异常: {str(e)}")
finally:
logger.info("服务已停止")
if __name__ == "__main__":
# 配置文件路径
CONFIG_PATH = "config.json"
# 创建并启动同步器
syncer = SharedFolderSyncer(CONFIG_PATH)
syncer.run()
config.json:
{
"sync_time": "2025-12-03 14:30",
"ft_share_paths": {
"share_folder_1": {
"module": "FS11-A5",
"car_number": "394",
"route": "//intelligent.sto.prod.geely.svc/ISD_file01/1B_Disk_TestTeam_Biange/J6M/韩常炎/hcy"
},
"share_folder_2": {
"module": "FS11-A6",
"car_number": "395",
"route": "//intelligent.sto.prod.geely.svc/ISD_file01/1B_Disk_TestTeam_Biange/J6M/韩常炎/wh"
},
"share_folder_3": {
"module": "FS11-A7",
"car_number": "396",
"route": "//intelligent.sto.prod.geely.svc/ISD_file01/1B_Disk_TestTeam_Biange/J6M/韩常炎/fyy"
}
},
"geely_save_path": "E:/share_test",
"interval": 10
}
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...
