选型不纠结:在高并发场景下如何权衡
asyncio、线程与进程(附可落地的实践清单)
asyncio
面向对象:既要打好并发基础的初学者,又想在生产环境中稳住吞吐与稳定性的资深开发者
目标:给出一套可操作的选型方法与工程化范式,并提供可直接粘贴使用的代码片段
开篇引入:并发的“路口选择题”
从最早的自动化脚本,到 Web 后端、数据采集、实时处理与 AI 工程,Python 在高并发场景下的存在感与日俱增。与此同时,问题也随之而来:到底用 还是线程池?CPU 吃紧时需不需要进程池?
asyncio
这篇文章不止讲“概念”,更给出可执行的选型流程、实践清单与踩坑解法。核心观点提前亮出:
I/O 密集、高连接数、库支持良好 ⇒ 优先 。I/O 阻塞库(难以替换)、少量并发 ⇒ 线程池。CPU 密集(避开 GIL)⇒ 进程池。异步主流程 + 少量阻塞点 ⇒
asyncio/
asyncio.to_thread 混合。混合流水线:线程/异步负责 I/O,进程负责重计算。
run_in_executor
一、基础知识速写:为什么会有 3 条路
线程(Thread):同一进程内共享内存,上下文切换轻,适合 I/O;但受 GIL 限制,CPU 密集不增益;要考虑线程安全。进程(Process):独立内存空间,可并行跑满多核;但创建/通信成本高,需可序列化(pickle),适合 CPU 密集。(协程):单线程事件循环,切换成本更低,适合大量并发 I/O;要求使用异步库,阻塞调用会卡整个 loop。
asyncio
一句话:选
看库生态,选线程看阻塞点与线程安全,选进程看 CPU 与内存边界。
asyncio
二、三选一的“十步走”决策树(可打印贴墙)
你的瓶颈是 I/O 还是 CPU?
主要等待网络/磁盘/数据库 ⇒ I/O主要算公式/解码/特征工程 ⇒ CPU
I/O 类型是否异步友好(有成熟的 async 库)?
有:HTTP(/
aiohttp)、PostgreSQL(
httpx)、Redis(
asyncpg)、S3(aiobotocore)等 ⇒
redis.asyncio无:老旧/私有 SDK、
asyncio、某些驱动 ⇒ 优先线程池包裹
requests
需要同时维护海量并发连接(10k+)?
需要 ⇒ (配合
asyncio、
uvloop、
asyncio.Semaphore)不需要(例如千级并发)⇒ 线程池也可胜任
asyncio.Queue
CPU 计算是否显著(>1 核死命跑)?
是 ⇒ 进程池(),异步/线程只做 I/O否 ⇒ 保持
ProcessPoolExecutor 或线程
asyncio
生态迁移成本
能换到 async 驱动:就换(收益长期)不能:线程池 + 超时 + 重试 + 限流
混合模式是否更优?
异步主干 + “吃掉”少量阻塞线程/异步调度数据,进程批量算
to_thread
平台限制
Windows/macOS 默认 ;Linux 常见
spawn。Pickle 代价、可序列化性要评估
fork
可观测性准备好吗?
指标:吞吐、TP95/TP99、失败率、超时率、队列长度、FD/线程数/内存
故障策略
超时、重试、熔断、降级、取消()、优雅关停
cancel_futures
回归串行基线
并发前先测串行指标,确保“并发真的带来收益”
三、
asyncio:当 I/O 与连接数成为主旋律
asyncio
3.1 典型场景
高并发 HTTP 客户端/服务端、推送长连接(WebSocket)、爬虫、实时数据转发数据库/缓存异步驱动(、
asyncpg、
redis.asyncio for MongoDB)
motor
3.2 最小闭环示例:限速 + 超时 + 重试 + 背压
import asyncio
import random
from contextlib import asynccontextmanager
import aiohttp
@asynccontextmanager
async def get_session():
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as s:
yield s
async def fetch(session, url, sem, retries=2):
async with sem: # 背压阀
for _ in range(retries + 1):
try:
async with session.get(url) as r:
return r.status, await r.text()
except Exception:
await asyncio.sleep(0.2 * random.random())
raise RuntimeError(f"failed: {url}")
async def main(urls):
sem = asyncio.Semaphore(200) # 最大并发
async with get_session() as s:
tasks = [asyncio.create_task(fetch(s, u, sem)) for u in urls]
results = []
for coro in asyncio.as_completed(tasks, timeout=30):
try:
results.append(await coro)
except Exception as e:
results.append(("ERR", str(e)))
return results
if __name__ == "__main__":
urls = [f"https://httpbin.org/status/{c}" for c in (200, 201, 404, 503)] * 200
print(asyncio.run(main(urls))[:5])
实践要点
用 做限流,避免压垮下游或耗尽 FD/端口
Semaphore + 局部超时,先收先处理统一
as_completed,减少 TLS/连接开销可考虑
ClientSession(Linux 下)进一步提升事件循环性能
uvloop
四、线程(ThreadPoolExecutor):当你必须“拥抱阻塞”
4.1 典型场景
无法替换的阻塞库:、某些 DB/存储驱动、老旧内网 SDK并发规模中等(几百~一两千并发)、对线程安全可控
requests
4.2 通用线程池模板:超时、重试、批量收敛
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests, time
def worker(url):
for _ in range(2):
try:
r = requests.get(url, timeout=3)
r.raise_for_status()
return r.status_code
except Exception:
time.sleep(0.2)
raise RuntimeError("retry-exceeded")
def run(urls, workers=64, timeout=30):
ok, bad = [], []
with ThreadPoolExecutor(max_workers=workers) as pool:
fut2url = {pool.submit(worker, u): u for u in urls}
for f in as_completed(fut2url, timeout=timeout):
u = fut2url[f]
try:
ok.append((u, f.result()))
except Exception as e:
bad.append((u, repr(e)))
return ok, bad
if __name__ == "__main__":
urls = [f"https://httpbin.org/status/{c}" for c in (200, 201, 404, 503)] * 200
success, failure = run(urls)
print(len(success), len(failure))
工程要点
线程安全:每线程独立资源(如 用
requests.Session)背压:
threading.local() ≈ 下游限额与本机 FD 限额之间的平衡避免共享可变全局,或使用锁/队列;优先不可变/消息传递
max_workers
五、进程(ProcessPoolExecutor):让 CPU 真并行
5.1 典型场景
编解码、图像处理、加密、数值模拟、特征计算、批量推理前处理任务可拆分、参数与返回可 pickle
5.2 进程池模板:切片 + 批量 + 避免巨对象传参
from concurrent.futures import ProcessPoolExecutor, as_completed
import os, math
def heavy_chunk(start, end):
# 模拟重计算
s = 0
for i in range(start, end):
s += (i * i) % 10007
return s
def run(n=10_000_000, parts=8, workers=None):
step = math.ceil(n / parts)
tasks = [(i, min(i+step, n)) for i in range(0, n, step)]
res = 0
with ProcessPoolExecutor(max_workers=workers) as pool:
futs = [pool.submit(heavy_chunk, a, b) for a, b in tasks]
for f in as_completed(futs):
res += f.result()
return res
if __name__ == "__main__":
print("CPU count:", os.cpu_count())
print(run())
工程要点
避免传巨大对象:传文件路径/偏移量/索引,子进程内部加载Windows/macOS 下 :确保入口
spawn进程数量 ≈ CPU 核心或略少;观察内存/NUMA/缓存抖动
if __name__ == "__main__":
六、混合架构:对症下药的“组装工”
现实里,单一技术很少能完美适配,混合往往是优解。
6.1 异步主干 +
to_thread 消化少量阻塞
to_thread
import asyncio, time, requests
def blocking_get(url):
return requests.get(url, timeout=3).text
async def main(urls):
# 异步里并发调度阻塞调用
tasks = [asyncio.to_thread(blocking_get, u) for u in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
urls = ["https://httpbin.org/get"] * 100
print(len(asyncio.run(main(urls))))
适用:异步生态占主导,但遗留少量阻塞点,无需为此改动系统结构。
6.2 线程/异步做 I/O,进程做重计算(流水线)
Stage A(抓取/读取):/线程池Stage B(计算/解析):进程池Stage C(写回/落库):
asyncio/线程池 + 批量提交通过队列/中间存储(Redis/Kafka/文件)实现解耦与回放
asyncio
七、关键工程议题与常见坑
7.1 超时与取消
、
asyncio.wait_for;线程/进程使用
ClientTimeout关停时
future.result(timeout=…),避免悬挂任务
shutdown(cancel_futures=True)
7.2 限流与背压
异步:、令牌桶(自实现)、
Semaphore 的连接池上限线程:
aiohttp 就是门闸;结合队列长度监控进程:任务切片粒度、并发进程数、磁盘/内存压力
max_workers
7.3 线程安全与可重入
驱动/SDK 把握不准时,每线程自有实例避免在回调里抛异常致线程静默退出;统一捕获并记录
7.4 Pickle 与可序列化
进程池函数需在顶层定义;闭包/局部函数不可用对象不可序列化时,传递标识符,子进程自行重新构造
7.5 资源上限
文件描述符(FD)、最大并发连接数(ulimit)、端口耗尽(TIME_WAIT)数据库连接池上限:应用层并发应 ≤ 下游连接池容量
7.6 监控与压测
指标:吞吐 QPS、P50/P95/P99、错误/超时率、任务排队时长工具:简单起步可用日志直方图;逐步接入 APM/metrics
八、从 0 到 1:三套“可复用骨架”
8.1 异步抓取骨架(HTTP + 限流)
# 核心:复用 Session、Semaphore 限流、as_completed 收敛、统一重试
(见 3.2 代码,可直接上手)
8.2 线程池代理阻塞库骨架(微服务/ETL 通用)
# 核心:ThreadPoolExecutor + as_completed + 本地会话/连接 + 批处理
(见 4.2 代码,可直接上手)
8.3 进程池重计算骨架(特征/批量转换)
# 核心:任务切片、防巨物传参、结果聚合
(见 5.2 代码,可直接上手)
九、性能调参与实验方法
先串行跑通,记录基线并发只调一个旋钮(如 ),画吞吐-延迟曲线找到“膝点”(吞吐收益递减且错误/超时上升之前)打开重试与限流,观测是否稳定长跑 30–60 分钟,检视内存增长/Fd 泄漏/句柄未回收
max_workers
经验值:
异步:
从 200 起步,按下游限额与 CPU 调整线程:64–256 之间找到平衡(取决于 IO/CPU 比与驱动特性)进程:
Semaphore,任务粒度以 0.1–1s 为宜
min(核数, 磁盘/内存允许的并发)
十、团队最佳实践 Checklist
识别瓶颈:I/O vs CPU 明确选型: / 线程 / 进程 / 混合 超时:单任务与全局 限流:Semaphore /
asyncio / 连接池 失败策略:重试、退避、熔断、降级 资源隔离:线程局部、子进程重建资源 观测:吞吐、分位延迟、错误、队列、FD/线程/内存 优雅关停:
max_workers、信号处理 回放机制:失败列表/中间件(Kafka/Redis/文件) 文档化:骨架代码与参数经验沉淀到团队模板
cancel_futures
十一、案例:小型实时抓取+解析+计算的三段式流水线
目标:
Stage A:高并发抓取( +
asyncio)Stage B:CPU 解析(
aiohttp)Stage C:落库(线程/异步任一,示例用异步批量写入伪代码)
ProcessPoolExecutor
关键思路:A 端只传 URL 与轻量文本;B 端进程内解析;C 端批量提交,限速写库。
由于篇幅,完整代码可直接把本文第 3.2 与 5.2 的片段拼装,A→队列→B→队列→C,三个独立组件用
或中间件解耦即可。
asyncio.Queue
十二、前沿与展望
在 Linux 上可明显降低事件循环开销子解释器(subinterpreters)(前瞻):有望在同进程内打破 GIL 限制(生态仍在演进中)异步生态日益完善:
uvloop、
httpx、
asyncpg、
redis.asyncio 等已能覆盖多数后端场景混合更常态:服务端异步 + 后台进程池计算 + 观测/重放链路成为标配
aiokafka
十三、总结与互动
要点回顾
I/O 型并发且库支持异步 ⇒ ;阻塞库不可替换 ⇒ 线程;CPU 重活 ⇒ 进程。“超时、限流、重试、观测、优雅关停”是高并发代码的五件套。混合流水线更贴近实际生产:用对工具、各司其职。
asyncio
开放问题
你们的主瓶颈更多来自 I/O 还是 CPU?如何量化?在异步主流程里,你是如何优雅处理遗留阻塞库的?进程池的任务切片,你更偏好按“时间”还是“数据量”来定?
欢迎在评论区分享你的经验和“坑点”,一起把并发这件事做稳、做快、做可维护。
SEO 与排版建议(给发布时的你)
关键词自然分布:Python并发、asyncio、线程池、进程池、高并发、最佳实践结构化标题(H2/H3)+ 代码高亮 + 简单流程图/ASCII 图文末提供“骨架代码”下载/链接,提升收藏与转发率
附录:可即取即用的小片段
A. 计时与指标采集装饰器
import time
from functools import wraps
from statistics import mean
_METRICS = {"latency": []}
def record_latency(name="task"):
def deco(fn):
@wraps(fn)
def wrap(*a, **k):
t0 = time.perf_counter()
try:
return fn(*a, **k)
finally:
_METRICS["latency"].append(time.perf_counter() - t0)
return wrap
return deco
def summary():
lat = _METRICS["latency"]
if lat:
print(f"count={len(lat)} mean={mean(lat):.4f}s max={max(lat):.4f}s")
B. 线程局部资源(避免跨线程共享)
import threading, requests
_tls = threading.local()
def session():
if not hasattr(_tls, "s"):
_tls.s = requests.Session()
return _tls.s
C.
asyncio 中的并发限流与超时组合
asyncio
import asyncio, aiohttp
async def bounded_get(url, sem, session):
async with sem:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as r:
return await r.text()
async def run(urls):
sem = asyncio.Semaphore(100)
async with aiohttp.ClientSession() as s:
tasks = [asyncio.create_task(bounded_get(u, sem, s)) for u in urls]
done, pending = await asyncio.wait(tasks, timeout=20)
for p in pending:
p.cancel()
return [d.result() for d in done if not d.cancelled()]
把本文的选型树 + 骨架代码沉淀到团队模板,你会发现:并发方案的难点并不在“写法多”,而在“约束清晰、职责清楚、观测完备”。当这些都到位,高并发就不再是赌运气,而是一门可反复复制的工程学。