在前面的例子中,我们学习了如何使用 asyncio 与 aiohttp 实现高并发网络请求。
本例将展示如何将 异步IO机制 应用于 文件读写操作,
使用 aiofiles 实现 非阻塞式文件访问。
问题描述:
我们希望实现一个异步日志系统:
- 从多个异步任务中并发写入日志文件;
- 使用 aiofiles 保证写入时不阻塞事件循环;
- 同时读取文件内容并输出。
⚙️代码示例:
import asyncio
import aiofiles
import time
# 异步写日志
async def write_log(filename, task_name, count):
async with aiofiles.open(filename, mode='a', encoding='utf-8') as f:
for i in range(count):
log_line = f"[{task_name}] 第{i+1}条日志,时间:{time.strftime('%X')}
"
await f.write(log_line)
print(f"✍️ 写入:{log_line.strip()}")
await asyncio.sleep(0.5) # 模拟写入间隔
# 异步读取日志
async def read_log(filename):
await asyncio.sleep(2) # 等待部分内容写入后再读取
async with aiofiles.open(filename, mode='r', encoding='utf-8') as f:
content = await f.read()
print("
当前日志内容:
" + content)
# 主协程
async def main():
filename = "async_log.txt"
# 清空旧日志
async with aiofiles.open(filename, 'w') as f:
await f.write("=== 异步日志系统开始 ===
")
# 并发执行多个日志写入任务
tasks = [
asyncio.create_task(write_log(filename, "任务A", 3)),
asyncio.create_task(write_log(filename, "任务B", 3)),
asyncio.create_task(read_log(filename)),
]
await asyncio.gather(*tasks)
print("
✅ 所有异步读写操作完成。")
# 运行
if __name__ == "__main__":
asyncio.run(main())
运行结果示例:
✍️ 写入:[任务A] 第1条日志,时间:14:20:01
✍️ 写入:[任务B] 第1条日志,时间:14:20:01
✍️ 写入:[任务A] 第2条日志,时间:14:20:02
✍️ 写入:[任务B] 第2条日志,时间:14:20:02
当前日志内容:
=== 异步日志系统开始 ===
[任务A] 第1条日志,时间:14:20:01
[任务B] 第1条日志,时间:14:20:01
[任务A] 第2条日志,时间:14:20:02
[任务B] 第2条日志,时间:14:20:02
✍️ 写入:[任务A] 第3条日志,时间:14:20:03
✍️ 写入:[任务B] 第3条日志,时间:14:20:03
✅ 所有异步读写操作完成。
可以看到写入与读取是交替发生的,
并且没有阻塞,体现了真正的异步并发文件操作。
工作原理解析:
|
组件 |
功能说明 |
|
aiofiles.open() |
异步打开文件,返回一个异步文件对象 |
|
await f.write() |
非阻塞写入数据到文件 |
|
await f.read() |
非阻塞读取文件内容 |
|
asyncio.gather() |
并发运行多个读写任务 |
|
await asyncio.sleep() |
模拟异步等待时间 |
异步 vs 同步文件IO对比
|
对比项 |
同步文件IO |
异步文件IO |
|
模式 |
阻塞 |
非阻塞 |
|
适用场景 |
单任务写文件 |
多任务并发写文件 |
|
性能 |
写入时CPU空闲 |
写入时执行其他任务 |
|
使用方式 |
open() / read() / write() |
aiofiles.open() / await f.read() / await f.write() |
进阶扩展:异步日志合并
如果你希望多个文件并发写入后再合并为一个总日志,可如下实现:
async def merge_logs(output_file, *input_files):
async with aiofiles.open(output_file, 'w', encoding='utf-8') as out:
for file in input_files:
async with aiofiles.open(file, 'r', encoding='utf-8') as f:
data = await f.read()
await out.write(f"
=== {file} ===
{data}")
调用方式:
await merge_logs("merged_log.txt", "async_log1.txt", "async_log2.txt")
应用场景:
- 异步日志系统(爬虫、API监控)
- 异步数据管道(下载 → 写入 → 读取)
- 异步批量文件操作(分析/存储)
- 异步大文件拆分与合并
✅总结:
|
知识点 |
说明 |
|
aiofiles |
异步文件读写库 |
|
async with |
上下文管理异步文件资源 |
|
await f.write() |
非阻塞文件写入 |
|
await asyncio.gather() |
并发调度多个任务 |
|
优点 |
无需多线程即可实现高并发文件IO |
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...