AutoTransferUbuntu.py
#!/usr/bin/env python3
"""
Ubuntu本地文件上传脚本
功能:将本地文件夹上传到另一个本地位置,包含空间检查、断点续传、文件校验和权限设置
日志文件以脚本开始执行时间命名:YYYYMMDD_HHMM.log
作者:AI助手
版本:3.2-ubuntu
"""
import os
import sys
import shutil
import time
import logging
import hashlib
import platform
import pickle
from pathlib import Path
from datetime import datetime
# 全局变量,存储日志文件名
LOG_FILENAME = None
def setup_logging():
"""设置日志配置,使用动态生成的文件名"""
global LOG_FILENAME
# 生成日志文件名:YYYYMMDD_HHMM.log
start_time = datetime.now()
LOG_FILENAME = start_time.strftime("%Y%m%d_%H%M") + ".log"
# 创建logs目录(如果不存在)
logs_dir = Path("logs")
logs_dir.mkdir(exist_ok=True)
# 完整的日志文件路径
log_file_path = logs_dir / LOG_FILENAME
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file_path, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
logger.info(f"日志文件创建: {log_file_path}")
logger.info(f"脚本启动时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
return logger
logger = setup_logging()
class LocalPathUploader:
def __init__(self, source_path, destination_path, progress_file=None):
"""
初始化上传器 - Ubuntu本地路径版本
Args:
source_path (str): 源文件夹路径
destination_path (str): 目标本地路径
progress_file (str): 进度记录文件路径,如果为None则自动生成
"""
self.source_path = Path(source_path)
self.destination_path = Path(destination_path)
# 如果未指定进度文件,使用基于时间的文件名
if progress_file is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
self.progress_file = Path(f"upload_progress_{timestamp}.pkl")
else:
self.progress_file = Path(progress_file)
self.total_files = 0
self.total_size = 0
self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
# 断点续传相关属性
self.progress_data = {
'session_id': self.session_id,
'start_time': None,
'total_files': 0,
'total_size': 0,
'copied_files': 0,
'copied_size': 0,
'completed_files': [],
'failed_files': [],
'log_file': LOG_FILENAME,
'last_update': None
}
# 加载之前的进度(如果存在)
self._load_progress()
logger.info(f"上传会话ID: {self.session_id}")
logger.info(f"进度文件: {self.progress_file}")
def _load_progress(self):
"""加载之前的传输进度"""
if self.progress_file.exists():
try:
with open(self.progress_file, 'rb') as f:
loaded_data = pickle.load(f)
# 检查是否是同一会话的进度
if 'session_id' in loaded_data and loaded_data['session_id'] == self.session_id:
logger.info(f"加载当前会话的进度")
else:
logger.info(f"加载之前会话的进度 (会话ID: {loaded_data.get('session_id', '未知')})")
self.progress_data.update(loaded_data)
completed_count = len(self.progress_data['completed_files'])
logger.info(f"加载传输进度: {completed_count}个文件已完成")
if self.progress_data['last_update']:
logger.info(f"上次进度更新时间: {self.progress_data['last_update']}")
return True
except Exception as e:
logger.warning(f"加载进度文件失败: {e}")
else:
logger.info("未找到进度文件,开始新的上传任务")
return False
def _save_progress(self):
"""保存当前传输进度"""
try:
self.progress_data['last_update'] = datetime.now().isoformat()
self.progress_data['log_file'] = LOG_FILENAME
with open(self.progress_file, 'wb') as f:
pickle.dump(self.progress_data, f)
logger.debug(f"进度已保存到: {self.progress_file}")
return True
except Exception as e:
logger.error(f"保存进度文件失败: {e}")
return False
def _clear_progress(self):
"""清除进度文件(上传完成后调用)"""
try:
if self.progress_file.exists():
# 重命名进度文件,添加完成时间戳
completion_time = datetime.now().strftime("%Y%m%d_%H%M%S")
completed_file = self.progress_file.with_name(
f"{self.progress_file.stem}_completed_{completion_time}.pkl")
self.progress_file.rename(completed_file)
logger.info(f"进度文件已存档: {completed_file}")
return True
except Exception as e:
logger.warning(f"清除进度文件失败: {e}")
return False
def validate_paths(self):
"""验证源路径和目标路径"""
if not self.source_path.exists():
raise FileNotFoundError(f"源路径不存在: {self.source_path}")
if not self.source_path.is_dir():
raise ValueError(f"源路径不是文件夹: {self.source_path}")
# 确保目标路径存在
try:
self.destination_path.mkdir(parents=True, exist_ok=True)
logger.info(f"路径验证成功: 源路径={self.source_path}, 目标路径={self.destination_path}")
return True
except Exception as e:
logger.error(f"无法访问目标路径 {self.destination_path}: {e}")
return False
def calculate_source_size(self):
"""计算源文件夹总大小"""
total_size = 0
total_files = 0
logger.info("正在计算源文件夹大小...")
for file_path in self.source_path.rglob('*'):
if file_path.is_file():
try:
total_size += file_path.stat().st_size
total_files += 1
except Exception as e:
logger.warning(f"无法访问文件 {file_path}: {e}")
self.total_size = total_size
self.total_files = total_files
# 更新进度数据
self.progress_data['total_files'] = total_files
self.progress_data['total_size'] = total_size
logger.info(f"源文件夹统计: 文件数={total_files}, 总大小={self._format_size(total_size)}")
return total_size
def check_disk_space(self, required_size=None):
"""检查目标磁盘空间是否足够"""
try:
if required_size is None:
required_size = self.total_size
# 获取目标路径所在文件系统的统计信息
statvfs = os.statvfs(str(self.destination_path))
free_space = statvfs.f_frsize * statvfs.f_bavail
logger.info(f"目标磁盘可用空间: {self._format_size(free_space)}")
logger.info(f"所需空间: {self._format_size(required_size)}")
# 增加10%的缓冲空间
required_with_buffer = required_size * 1.1
has_space = free_space >= required_with_buffer
if not has_space:
logger.warning(
f"磁盘空间不足! 需要: {self._format_size(required_with_buffer)}, 可用: {self._format_size(free_space)}")
return has_space
except Exception as e:
logger.error(f"检查磁盘空间失败: {e}")
return False
def wait_for_space(self, required_size=None):
"""等待直到有足够空间"""
logger.info("磁盘空间不足,开始等待...")
wait_count = 0
while True:
if self.check_disk_space(required_size):
logger.info("磁盘空间已足够,继续上传...")
return True
wait_count += 1
logger.info(f"空间仍然不足 (第{wait_count}次检查),5分钟后重新检查...")
time.sleep(300) # 等待5分钟
def copy_file_with_progress(self, src, dst, file_size):
"""复制单个文件并显示进度,支持磁盘空间检查
Args:
src: 源文件路径
dst: 目标文件路径
file_size: 文件大小
Returns:
bool: 是否成功复制
"""
try:
# 检查磁盘空间是否足够
if not self.check_disk_space(file_size):
logger.error(f"磁盘空间不足,无法复制文件: {src}")
return False
# 确保目标目录存在
dst.parent.mkdir(parents=True, exist_ok=True)
# 对于大文件,使用分块复制
chunk_size = 1024 * 1024 * 64 # 64MB chunks
with open(src, 'rb') as f_src, open(dst, 'wb') as f_dst:
while True:
# 每次写入前检查磁盘空间
if not self.check_disk_space(chunk_size):
logger.error(f"复制过程中磁盘空间不足: {src}")
return False
chunk = f_src.read(chunk_size)
if not chunk:
break
f_dst.write(chunk)
# 保持文件元数据
shutil.copystat(src, dst)
return True
except Exception as e:
logger.error(f"复制文件失败 {src} -> {dst}: {str(e)}")
return False
def is_file_completed(self, relative_path):
"""检查文件是否已经完成上传
Args:
relative_path: 文件的相对路径
Returns:
bool: 文件是否已完成上传
"""
# 检查进度记录
if str(relative_path) in self.progress_data['completed_files']:
return True
# 检查目标文件是否存在且大小匹配
source_file = self.source_path / relative_path
destination_file = self.destination_path / relative_path
if destination_file.exists():
try:
if (destination_file.stat().st_size == source_file.stat().st_size and
self.get_file_hash(source_file) == self.get_file_hash(destination_file)):
# 文件已存在且相同,添加到已完成列表
self.progress_data['completed_files'].append(str(relative_path))
self._save_progress()
return True
except Exception as e:
logger.warning(f"检查文件状态失败 {destination_file}: {e}")
return False
def upload_files(self):
"""上传所有文件,支持断点续传"""
logger.info("开始上传文件...")
start_time = time.time()
self.progress_data['start_time'] = start_time
self.progress_data['session_id'] = self.session_id
self._save_progress()
success_count = 0
fail_count = 0
skipped_count = 0
# 先创建所有目录结构
logger.info("创建目标目录结构...")
dirs_created = 0
for dir_path in self.source_path.rglob('*'):
if dir_path.is_dir():
relative_path = dir_path.relative_to(self.source_path)
destination_dir = self.destination_path / relative_path
destination_dir.mkdir(parents=True, exist_ok=True)
dirs_created += 1
logger.info(f"创建了 {dirs_created} 个目录")
# 收集所有需要上传的文件
logger.info("扫描源文件夹中的文件...")
all_files = []
for file_path in self.source_path.rglob('*'):
if file_path.is_file():
relative_path = file_path.relative_to(self.source_path)
all_files.append((file_path, relative_path))
logger.info(f"找到 {len(all_files)} 个文件需要处理")
# 然后复制文件
for idx, (file_path, relative_path) in enumerate(all_files, 1):
destination_file = self.destination_path / relative_path
# 检查文件是否已经完成
if self.is_file_completed(relative_path):
logger.debug(f"文件已存在且相同,跳过: {relative_path}")
skipped_count += 1
continue
file_size = file_path.stat().st_size
logger.info(f"上传文件 [{idx}/{len(all_files)}]: {relative_path} ({self._format_size(file_size)})")
# 复制文件
if self.copy_file_with_progress(file_path, destination_file, file_size):
# 记录成功
self.progress_data['completed_files'].append(str(relative_path))
self.progress_data['copied_files'] += 1
self.progress_data['copied_size'] += file_size
self._save_progress() # 保存进度
success_count += 1
# 显示进度
progress = (len(self.progress_data['completed_files']) / self.total_files) * 100
copied_size_fmt = self._format_size(self.progress_data['copied_size'])
total_size_fmt = self._format_size(self.total_size)
if idx % 10 == 0 or idx == len(all_files): # 每10个文件或最后一个文件时显示进度
logger.info(
f"上传进度: {len(self.progress_data['completed_files'])}/{self.total_files} ({progress:.1f}%) - {copied_size_fmt}/{total_size_fmt}")
else:
# 记录失败
self.progress_data['failed_files'].append(str(relative_path))
self._save_progress() # 保存进度
fail_count += 1
logger.error(f"文件上传失败: {relative_path}")
# 如果是磁盘空间不足,等待空间恢复
if not self.check_disk_space(file_size):
logger.error("磁盘空间不足,等待空间恢复...")
if self.wait_for_space(file_size):
logger.info("空间恢复,继续上传...")
# 重试当前文件
if self.copy_file_with_progress(file_path, destination_file, file_size):
self.progress_data['completed_files'].append(str(relative_path))
if str(relative_path) in self.progress_data['failed_files']:
self.progress_data['failed_files'].remove(str(relative_path))
self.progress_data['copied_files'] += 1
self.progress_data['copied_size'] += file_size
self._save_progress()
success_count += 1
fail_count -= 1
logger.info(f"重试成功: {relative_path}")
else:
logger.error(f"重试失败: {relative_path}")
else:
logger.error("等待空间恢复失败,停止上传")
return False
elapsed_time = time.time() - start_time
transfer_rate = self.progress_data['copied_size'] / elapsed_time if elapsed_time > 0 else 0
logger.info(f"上传完成统计:")
logger.info(f" 成功: {success_count} 个文件")
logger.info(f" 失败: {fail_count} 个文件")
logger.info(f" 跳过: {skipped_count} 个文件")
logger.info(f" 耗时: {self._format_time(elapsed_time)}")
logger.info(f" 平均速率: {self._format_size(transfer_rate)}/s")
# 如果有失败的文件,记录到日志
if self.progress_data['failed_files']:
logger.error(f"以下文件上传失败:")
for failed_file in self.progress_data['failed_files']:
logger.error(f" - {failed_file}")
return fail_count == 0
def verify_upload(self):
"""验证上传的文件是否完整"""
logger.info("开始验证上传的文件...")
source_files = {}
destination_files = {}
# 收集源文件信息
logger.info("收集源文件信息...")
source_file_count = 0
for file_path in self.source_path.rglob('*'):
if file_path.is_file():
try:
relative_path = file_path.relative_to(self.source_path)
source_files[str(relative_path)] = {
'size': file_path.stat().st_size
}
source_file_count += 1
except Exception as e:
logger.warning(f"无法读取源文件 {file_path}: {str(e)}")
logger.info(f"收集到 {source_file_count} 个源文件信息")
# 收集目标文件信息
logger.info("收集目标文件信息...")
dest_file_count = 0
for file_path in self.destination_path.rglob('*'):
if file_path.is_file():
try:
relative_path = file_path.relative_to(self.destination_path)
destination_files[str(relative_path)] = {
'size': file_path.stat().st_size
}
dest_file_count += 1
except Exception as e:
logger.warning(f"无法读取目标文件 {file_path}: {str(e)}")
logger.info(f"收集到 {dest_file_count} 个目标文件信息")
# 比较文件
missing_files = []
size_mismatch = []
logger.info("比较文件...")
for rel_path, src_info in source_files.items():
if rel_path not in destination_files:
missing_files.append(rel_path)
continue
dst_info = destination_files[rel_path]
if src_info['size'] != dst_info['size']:
size_mismatch.append(
f"{rel_path} (源: {self._format_size(src_info['size'])}, 目标: {self._format_size(dst_info['size'])})")
# 报告验证结果
if not missing_files and not size_mismatch:
logger.info("✓ 验证通过:所有文件上传完整且正确")
return True
else:
logger.error("✗ 验证失败:")
if missing_files:
logger.error(f" 缺失文件: {len(missing_files)}个")
for f in missing_files[:10]: # 只显示前10个
logger.error(f" - {f}")
if len(missing_files) > 10:
logger.error(f" ... 还有 {len(missing_files) - 10} 个文件未显示")
if size_mismatch:
logger.error(f" 大小不匹配: {len(size_mismatch)}个")
for f in size_mismatch[:10]:
logger.error(f" - {f}")
if len(size_mismatch) > 10:
logger.error(f" ... 还有 {len(size_mismatch) - 10} 个文件未显示")
return False
def set_permissions(self):
"""设置文件夹权限为777"""
logger.info("正在设置文件夹权限...")
try:
files_processed = 0
dirs_processed = 0
for root, dirs, files in os.walk(str(self.destination_path)):
# 设置目录权限
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
try:
os.chmod(dir_path, 0o777)
dirs_processed += 1
except Exception as e:
logger.warning(f"无法设置目录权限 {dir_path}: {e}")
# 设置文件权限
for file_name in files:
file_path = os.path.join(root, file_name)
try:
os.chmod(file_path, 0o777)
files_processed += 1
except Exception as e:
logger.warning(f"无法设置文件权限 {file_path}: {e}")
logger.info(f"✓ 权限设置完成 (777): {dirs_processed}个目录, {files_processed}个文件")
return True
except Exception as e:
logger.error(f"设置权限失败: {str(e)}")
return False
def get_file_hash(self, file_path, block_size=65536):
"""计算文件的MD5哈希值(用于验证)"""
hasher = hashlib.md5()
try:
with open(file_path, 'rb') as f:
for block in iter(lambda: f.read(block_size), b""):
hasher.update(block)
return hasher.hexdigest()
except Exception as e:
logger.warning(f"无法计算文件哈希 {file_path}: {str(e)}")
return None
def _format_size(self, size_bytes):
"""格式化文件大小显示"""
if size_bytes == 0:
return "0 B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def _format_time(self, seconds):
"""格式化时间显示"""
if seconds < 60:
return f"{seconds:.1f}秒"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}分钟"
else:
hours = seconds / 3600
return f"{hours:.1f}小时"
def get_progress_summary(self):
"""获取传输进度摘要"""
if self.progress_data['total_files'] == 0:
return "暂无进度信息"
completed = len(self.progress_data['completed_files'])
total = self.progress_data['total_files']
progress_percent = (completed / total) * 100 if total > 0 else 0
copied_size = self.progress_data['copied_size']
total_size = self.progress_data['total_size']
size_percent = (copied_size / total_size) * 100 if total_size > 0 else 0
return (f"进度: {completed}/{total} 文件 ({progress_percent:.1f}%) | "
f"{self._format_size(copied_size)}/{self._format_size(total_size)} ({size_percent:.1f}%)")
def main():
"""主函数"""
# 配置参数 - 修改为您的实际路径
SOURCE_DIR = "/media/geely/E245/20251126" # 源文件夹路径
DESTINATION_DIR = "/home/geely/ISD_file01/share_file/1B_Disk_TestTeam_Biange/J6M/P181/565/20251126" # 目标共享盘路径
PROGRESS_FILE = None # 设置为None让脚本自动生成进度文件名
# 如果通过命令行参数指定路径
if len(sys.argv) >= 3:
SOURCE_DIR = sys.argv[1]
DESTINATION_DIR = sys.argv[2]
if len(sys.argv) >= 4:
PROGRESS_FILE = sys.argv[3]
# 脚本开始执行信息
logger.info("=" * 70)
logger.info("Ubuntu本地文件上传脚本启动 - 支持断点续传")
logger.info(f"操作系统: {platform.system()} {platform.release()}")
logger.info(f"Python版本: {platform.python_version()}")
logger.info(f"源路径: {SOURCE_DIR}")
logger.info(f"目标路径: {DESTINATION_DIR}")
logger.info(f"日志文件: logs/{LOG_FILENAME}")
logger.info("=" * 70)
try:
# 创建上传器实例
uploader = LocalPathUploader(SOURCE_DIR, DESTINATION_DIR, PROGRESS_FILE)
# 显示当前进度(如果有)
progress_summary = uploader.get_progress_summary()
logger.info(f"当前进度: {progress_summary}")
# 1. 验证路径
logger.info("步骤1: 验证路径...")
if not uploader.validate_paths():
logger.error("路径验证失败!")
return False
# 2. 计算源文件夹大小
logger.info("步骤2: 计算源文件夹大小...")
source_size = uploader.calculate_source_size()
if source_size == 0:
logger.error("源文件夹为空或无法访问!")
return False
# 3. 检查磁盘空间
logger.info("步骤3: 检查磁盘空间...")
if not uploader.check_disk_space():
logger.warning("磁盘空间不足!")
if not uploader.wait_for_space():
logger.error("等待空间超时或失败!")
return False
# 4. 上传文件(支持断点续传)
logger.info("步骤4: 开始上传文件...")
upload_success = uploader.upload_files()
if not upload_success:
logger.error("文件上传过程中出现错误!")
# 不立即返回,可能只是部分文件失败
# 5. 验证上传
logger.info("步骤5: 验证上传文件...")
verify_success = uploader.verify_upload()
if not verify_success:
logger.error("文件验证失败!")
return False
# 6. 设置权限
logger.info("步骤6: 设置文件权限...")
permission_success = uploader.set_permissions()
if not permission_success:
logger.error("权限设置失败!")
return False
# 7. 清除进度文件(只有完全成功时才清除)
if upload_success and verify_success and permission_success:
uploader._clear_progress()
# 脚本结束信息
end_time = datetime.now()
logger.info("=" * 70)
logger.info("✓ 所有操作完成!")
logger.info(f"脚本结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"日志文件保存在: logs/{LOG_FILENAME}")
logger.info("=" * 70)
return True
except Exception as e:
logger.error(f"脚本执行失败: {str(e)}")
import traceback
logger.debug(traceback.format_exc())
# 记录失败时的信息
logger.error(f"错误发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.error(f"日志文件: logs/{LOG_FILENAME}")
return False
if __name__ == "__main__":
# 显示使用说明(如果有-h或--help参数)
if len(sys.argv) > 1 and (sys.argv[1] == '-h' or sys.argv[1] == '--help'):
sys.exit(0)
success = main()
sys.exit(0 if success else 1)
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...



