Python:数据同步脚本(AutoSynchronizeFileCopy)需要配置文件

内容分享1天前发布
0 0 0

文件夹层级
AutoSynchronizeFileCopy.py
config.json

AutoSynchronizeFileCopy.py :


import os
import re
import json
import time
import shutil
import logging
import datetime
from typing import Dict, Any, List, Tuple, Optional

# 配置日志格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("sync_log.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()


class SharedFolderSyncer:
    def __init__(self, config_path: str):
        self.config = self.load_config(config_path)
        self.state_file = "sync_state.json"
        self.sync_state = self.load_sync_state()

        # 创建保存根目录
        os.makedirs(self.config['geely_save_path'], exist_ok=True)
        logger.info(f"保存根目录已创建: {self.config['geely_save_path']}")

        # 记录共享路径连接状态
        self.connection_status = {}

        # 标记是否已完成首次同步
        self.first_run_completed = self.sync_state.get('first_run_completed', False)
        logger.info(f"首次同步状态: {'已完成' if self.first_run_completed else '未完成'}")

        # 解析首次同步时间
        self.initial_sync_time = self.parse_initial_sync_time()
        logger.info(f"首次同步基准时间: {datetime.datetime.fromtimestamp(self.initial_sync_time)}")

    def parse_initial_sync_time(self) -> float:
        """解析首次同步时间字符串为时间戳"""
        try:
            sync_time_str = self.config['sync_time']
            sync_dt = datetime.datetime.strptime(sync_time_str, "%Y-%m-%d %H:%M")
            return sync_dt.timestamp()
        except Exception as e:
            logger.error(f"解析首次同步时间失败: {e}, 将使用当前时间")
            return time.time()

    def load_config(self, path: str) -> Dict[str, Any]:
        """加载配置文件"""
        try:
            with open(path, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载配置文件失败: {e}")
            raise

    def load_sync_state(self) -> Dict[str, Any]:
        """加载同步状态文件(记录最后同步时间和首次运行状态)"""
        state = {
            'first_run_completed': False,
            'last_sync_times': {k: 0.0 for k in self.config['ft_share_paths'].keys()}
        }

        if os.path.exists(self.state_file):
            try:
                with open(self.state_file, 'r') as f:
                    saved_state = json.load(f)
                    # 兼容旧版本和新版本状态文件
                    if 'first_run_completed' in saved_state:
                        state['first_run_completed'] = saved_state['first_run_completed']
                    if 'last_sync_times' in saved_state:
                        state['last_sync_times'] = saved_state['last_sync_times']
                    else:
                        # 旧版本状态文件只有同步时间
                        for k in self.config['ft_share_paths'].keys():
                            state['last_sync_times'][k] = saved_state.get(k, 0.0)
            except Exception as e:
                logger.error(f"加载状态文件失败: {e}")
                # 状态文件损坏时创建新文件
                self.save_sync_state(state)

        return state

    def save_sync_state(self):
        """保存同步状态"""
        try:
            with open(self.state_file, 'w') as f:
                json.dump(self.sync_state, f, indent=2)
            logger.info("同步状态已保存")
            logger.info(f"当前同步状态: {json.dumps(self.sync_state, indent=2)}")
        except Exception as e:
            logger.error(f"保存状态文件失败: {e}")

    def check_share_connection(self, share_path: str) -> bool:
        """检查共享路径是否可访问"""
        try:
            # 尝试访问共享路径
            if os.path.exists(share_path):
                self.connection_status[share_path] = True
                logger.info(f"成功连接到共享路径: {share_path}")
                return True
            else:
                self.connection_status[share_path] = False
                logger.warning(f"无法连接到共享路径: {share_path}")
                return False
        except Exception as e:
            self.connection_status[share_path] = False
            logger.error(f"检查共享路径连接失败: {share_path}, 错误: {str(e)}")
            return False

    def extract_datetime_parts(self, folder_name: str) -> Optional[Tuple[str, str]]:
        """
        从文件夹名称中提取日期和时间部分
        例如: "163878_20250809_145458" -> ("20250809", "145458")
        """
        # 匹配下划线分隔的8位日期和6位时间
        match = re.search(r'(d{8})_(d{6})', folder_name)
        if match:
            return match.group(1), match.group(2)

        # 尝试其他可能的格式
        match = re.search(r'(d{8})-(d{6})', folder_name)
        if match:
            return match.group(1), match.group(2)

        # 尝试纯日期时间格式
        match = re.search(r'(d{8})_?(d{6})', folder_name)
        if match:
            return match.group(1), match.group(2)

        logger.warning(f"无法从文件夹名称提取日期时间部分: {folder_name}")
        return None

    def copy_large_file(self, src: str, dst: str, buffer_size=1024 * 1024 * 100) -> bool:
        """分块复制大文件避免内存溢出"""
        try:
            # 确保目标目录存在
            os.makedirs(os.path.dirname(dst), exist_ok=True)

            # 复制文件内容
            with open(src, 'rb') as f_src:
                with open(dst, 'wb') as f_dst:
                    while True:
                        chunk = f_src.read(buffer_size)
                        if not chunk:
                            break
                        f_dst.write(chunk)

            # 保留原始修改时间
            shutil.copystat(src, dst)
            logger.info(f"成功复制文件: {src} -> {dst}")
            return True
        except Exception as e:
            logger.error(f"复制文件失败 {src} -> {dst}: {str(e)}")
            # 清理部分复制的文件
            if os.path.exists(dst):
                try:
                    os.remove(dst)
                except:
                    pass
            return False

    def sync_folder(self, share_key: str, folder_info: Dict[str, str]):
        """同步单个共享文件夹及其所有内容(基于文件夹修改时间)"""
        share_path = folder_info['route']
        module = folder_info['module']
        car_number = folder_info['car_number']

        # 在开始同步前记录时间戳
        sync_start_time = time.time()
        logger.info(f"共享文件夹 {share_key} 同步开始时间: {datetime.datetime.fromtimestamp(sync_start_time)}")

        # 确定上次同步时间
        if not self.first_run_completed:
            # 首次同步使用config.json中的sync_time
            last_sync = self.initial_sync_time
            logger.info(f"首次同步使用配置的初始同步时间: {datetime.datetime.fromtimestamp(last_sync)}")
        else:
            # 后续同步使用sync_state.json中的last_sync_times
            last_sync = self.sync_state['last_sync_times'].get(share_key, 0.0)
            logger.info(f"上次同步时间: {datetime.datetime.fromtimestamp(last_sync)}")

        # 检查共享路径连接
        if not self.check_share_connection(share_path):
            logger.error(f"无法访问共享路径,跳过同步: {share_path}")
            # 即使无法连接,如果是首次同步,也要设置last_sync_times为initial_sync_time
            if not self.first_run_completed:
                self.sync_state['last_sync_times'][share_key] = self.initial_sync_time
                logger.info(f"首次同步无法访问共享路径 {share_key},已设置其同步时间为配置的初始时间")
            return

        # 创建基础目标目录
        base_target_dir = os.path.join(
            self.config['geely_save_path'],
            module,
            car_number
        )
        os.makedirs(base_target_dir, exist_ok=True)

        logger.info(f"开始同步: {share_key}")
        logger.info(f"源路径: {share_path}")
        logger.info(f"目标基础路径: {base_target_dir}")

        # 记录新增内容
        new_folders = []
        new_files = []

        # 标记是否有找到需要同步的文件夹
        found_folder_to_sync = False

        # 遍历共享文件夹下的所有日期文件夹
        for date_folder in os.listdir(share_path):
            date_folder_path = os.path.join(share_path, date_folder)

            # 跳过文件,只处理文件夹
            if not os.path.isdir(date_folder_path):
                continue

            try:
                # 获取文件夹修改时间
                folder_mtime = os.path.getmtime(date_folder_path)
                folder_mtime_dt = datetime.datetime.fromtimestamp(folder_mtime)

                # 检查文件夹是否更新
                if folder_mtime > last_sync:
                    found_folder_to_sync = True

                    # 提取日期和时间部分
                    datetime_parts = self.extract_datetime_parts(date_folder)

                    if datetime_parts:
                        date_str, time_str = datetime_parts
                        logger.info(f"处理日期文件夹: {date_folder} -> 日期: {date_str}, 时间: {time_str}")

                        # 创建目录层级: 日期/ROS&BLF/原始名称
                        date_dir = os.path.join(base_target_dir, date_str)
                        rosblf_dir = os.path.join(date_dir, "ROS&BLF")
                        # 使用原始名称作为目标文件夹名
                        target_dir = os.path.join(rosblf_dir, date_folder)
                        os.makedirs(target_dir, exist_ok=True)
                        new_folders.append(f"{date_str}/ROS&BLF/{date_folder}")

                        # 记录此日期文件夹是否有新增内容
                        date_folder_has_new = False

                        # 递归同步日期文件夹下的所有内容
                        for root, dirs, files in os.walk(date_folder_path):
                            # 处理当前目录下的所有文件
                            for file in files:
                                src_file = os.path.join(root, file)
                                try:
                                    # 计算目标路径
                                    rel_path = os.path.relpath(root, date_folder_path)
                                    dest_path = os.path.join(target_dir, rel_path, file)

                                    # 复制文件
                                    if self.copy_large_file(src_file, dest_path):
                                        new_files.append(dest_path)
                                        date_folder_has_new = True
                                except Exception as e:
                                    logger.error(f"处理文件失败 {src_file}: {str(e)}")
                    else:
                        # 无法提取日期时间部分时使用原始名称作为目录
                        logger.warning(f"无法提取日期时间部分,使用原始名称: {date_folder}")
                        # 创建目录层级: 原始名称/ROS&BLF/原始名称
                        folder_dir = os.path.join(base_target_dir, date_folder)
                        rosblf_dir = os.path.join(folder_dir, "ROS&BLF")
                        target_dir = os.path.join(rosblf_dir, date_folder)
                        os.makedirs(target_dir, exist_ok=True)
                        new_folders.append(f"{date_folder}/ROS&BLF/{date_folder}")

                        # 递归同步文件夹内容
                        for root, dirs, files in os.walk(date_folder_path):
                            for file in files:
                                src_file = os.path.join(root, file)
                                try:
                                    rel_path = os.path.relpath(root, date_folder_path)
                                    dest_path = os.path.join(target_dir, rel_path, file)

                                    if self.copy_large_file(src_file, dest_path):
                                        new_files.append(dest_path)
                                except Exception as e:
                                    logger.error(f"处理文件失败 {src_file}: {str(e)}")
                else:
                    logger.info(f"文件夹未更新,跳过: {date_folder} (修改时间: {folder_mtime_dt})")
            except Exception as e:
                logger.error(f"处理日期文件夹 {date_folder} 失败: {str(e)}")
                continue

        # 处理同步时间戳的逻辑
        if found_folder_to_sync:
            # 如果有需要同步的文件夹,更新为本次同步的开始时间
            self.sync_state['last_sync_times'][share_key] = sync_start_time
            logger.info(f"有新增数据,更新 {share_key} 的同步时间为: {datetime.datetime.fromtimestamp(sync_start_time)}")
        else:
            if not self.first_run_completed:
                # 首次同步但没有需要同步的文件夹,设置为配置的初始时间
                self.sync_state['last_sync_times'][share_key] = self.initial_sync_time
                logger.info(
                    f"首次同步无新增数据,设置 {share_key} 的同步时间为配置的初始时间: {datetime.datetime.fromtimestamp(self.initial_sync_time)}")
            else:
                # 非首次同步但没有需要同步的文件夹,保持上次同步时间不变
                logger.info(f"无新增数据,保持 {share_key} 的上次同步时间不变")

        # 记录同步结束时间用于日志
        sync_end_time = time.time()
        sync_duration = sync_end_time - sync_start_time

        logger.info(f"同步完成: {share_key}, 新增文件夹: {len(new_folders)}, 新增文件: {len(new_files)}")
        logger.info(f"同步开始时间: {datetime.datetime.fromtimestamp(sync_start_time)}")
        logger.info(f"同步结束时间: {datetime.datetime.fromtimestamp(sync_end_time)}")
        logger.info(f"同步持续时间: {sync_duration:.2f}秒")

        if new_folders:
            logger.info(f"新增文件夹列表: {new_folders}")
        if new_files:
            logger.info(f"新增文件列表: {new_files[:5]}")  # 只显示前5个文件避免日志过长

    def run_sync(self):
        """执行所有文件夹同步"""
        logger.info("=" * 60)
        logger.info(f"开始同步任务 - {datetime.datetime.now()}")

        # 记录是否已完成首次同步
        original_first_run_completed = self.first_run_completed

        for share_key, folder_info in self.config['ft_share_paths'].items():
            try:
                logger.info("-" * 50)
                logger.info(f"处理共享文件夹: {share_key}")
                self.sync_folder(share_key, folder_info)
            except Exception as e:
                logger.error(f"同步 {share_key} 失败: {str(e)}")

        # 如果这是首次同步(first_run_completed为false),则在一轮同步完成后更新状态
        if not original_first_run_completed:
            self.sync_state['first_run_completed'] = True
            self.first_run_completed = True
            logger.info(f"首次同步已完成,后续同步将基于sync_state.json中的last_sync_times")

        # 保存同步状态
        self.save_sync_state()

        logger.info(f"同步任务完成 - {datetime.datetime.now()}")

    def calculate_next_run(self):
        """计算下次运行时间"""
        now = datetime.datetime.now()

        if not self.first_run_completed:
            # 首次运行 - 使用配置的sync_time
            try:
                sync_time_str = self.config['sync_time']
                target_time = datetime.datetime.strptime(sync_time_str, "%Y-%m-%d %H:%M")
                logger.info(f"首次运行计划时间: {target_time}")

                # 如果当前时间已过设定时间,则立即执行
                if now > target_time:
                    logger.info("首次运行时间已过,将立即执行")
                    return now
                return target_time
            except Exception as e:
                logger.error(f"解析首次运行时间失败: {e}, 将立即执行")
                return now
        else:
            # 后续运行 - 使用interval间隔
            interval_minutes = self.config['interval']
            logger.info(f"后续运行将使用间隔时间: {interval_minutes} 分钟")
            return now + datetime.timedelta(minutes=interval_minutes)

    def schedule_sync(self):
        """定时任务调度"""
        while True:
            # 计算下次运行时间
            next_run_time = self.calculate_next_run()
            now = datetime.datetime.now()
            wait_seconds = (next_run_time - now).total_seconds()

            # 确保等待时间为正数
            if wait_seconds < 0:
                wait_seconds = 0

            logger.info(f"下次同步时间: {next_run_time}, 等待 {wait_seconds / 60:.1f} 分钟")

            # 等待到同步时间
            time.sleep(wait_seconds)

            # 执行同步
            self.run_sync()

    def run(self):
        """启动同步服务"""
        try:
            logger.info("=" * 60)
            logger.info("文件夹同步服务启动")
            logger.info(f"当前同步状态: {json.dumps(self.sync_state, indent=2)}")
            self.schedule_sync()
        except KeyboardInterrupt:
            logger.info("用户中断服务")
        except Exception as e:
            logger.exception(f"服务运行异常: {str(e)}")
        finally:
            logger.info("服务已停止")


if __name__ == "__main__":
    # 配置文件路径
    CONFIG_PATH = "config.json"

    # 创建并启动同步器
    syncer = SharedFolderSyncer(CONFIG_PATH)
    syncer.run()

config.json:


{
  "sync_time": "2025-12-03 14:30",
  "ft_share_paths": {
    "share_folder_1": {
      "module": "FS11-A5",
      "car_number": "394",
      "route": "//intelligent.sto.prod.geely.svc/ISD_file01/1B_Disk_TestTeam_Biange/J6M/韩常炎/hcy"
    },
    "share_folder_2": {
      "module": "FS11-A6",
      "car_number": "395",
      "route": "//intelligent.sto.prod.geely.svc/ISD_file01/1B_Disk_TestTeam_Biange/J6M/韩常炎/wh"
    },
    "share_folder_3": {
      "module": "FS11-A7",
      "car_number": "396",
      "route": "//intelligent.sto.prod.geely.svc/ISD_file01/1B_Disk_TestTeam_Biange/J6M/韩常炎/fyy"
    }
  },
  "geely_save_path": "E:/share_test",
  "interval": 10
}

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...