并发编程
在实际项目中,程序经常需要同时处理多个任务:同时下载多个文件、同时处理多个请求、同时执行多个计算。Python 提供了多种并发编程模型,选择合适的模型能大幅提升程序效率。
并发 vs 并行
先区分两个概念:
- 并发(Concurrency):多个任务交替执行,看起来是同时进行的。单核 CPU 通过时间片轮转实现并发。
- 并行(Parallelism):多个任务真正同时执行。需要多核 CPU,每个核心运行一个任务。
对于 I/O 密集型 任务(网络请求、文件读写),并发就能显著提升效率——一个任务等待 I/O 时,CPU 可以切换到另一个任务。对于 CPU 密集型 任务(大量计算),则需要并行才能真正加速。
threading 模块
Python 的 threading 模块可以创建多线程:
import threading
import time
def fetch(url):
print(f"开始下载 {url}")
time.sleep(1) # 模拟网络延迟
print(f"完成下载 {url}")
urls = ["url1", "url2", "url3"]
threads = []
for url in urls:
t = threading.Thread(target=fetch, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("全部下载完成")
GIL 的限制
Python 有一个全局解释器锁(GIL, Global Interpreter Lock),它确保同一时刻只有一个线程在执行 Python 字节码。这意味着:多线程并不能加速 CPU 密集型任务。
但对于 I/O 密集型任务,GIL 会在线程等待 I/O 时释放,所以多线程仍然有效。
concurrent.futures
concurrent.futures 提供了更高级的并发接口,使用起来比 threading 更简洁。
ThreadPoolExecutor
ThreadPoolExecutor 会自动管理线程池:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch(url):
time.sleep(1)
return f"{url} 的数据"
urls = [f"url{i}" for i in range(10)]
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
future_to_url = {executor.submit(fetch, url): url for url in urls}
# 按完成顺序获取结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url}: {data}")
except Exception as e:
print(f"{url} 出错: {e}")
max_workers 控制线程池的大小。对于 I/O 密集型任务,通常可以设置得大一些(比如 20 或 50)。
map 方法
如果不需要按完成顺序处理结果,可以用 map:
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch, urls)
for url, data in zip(urls, results):
print(f"{url}: {data}")
注意:executor.map 会按输入顺序返回结果,即使后面的任务先完成,也会等待前面的任务。
ProcessPoolExecutor
对于 CPU 密集型任务,用 ProcessPoolExecutor 创建进程池,绕过 GIL:
from concurrent.futures import ProcessPoolExecutor
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
numbers = [112272535095293] * 10
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(is_prime, numbers)
print(list(results))
进程之间的数据传递需要序列化(pickle),开销比线程大。所以只有 CPU 密集型任务才值得用进程池。
asyncio
asyncio 是 Python 3.4+ 引入的异步 I/O 框架,它使用单线程事件循环来调度多个协程(coroutine),避免了线程切换的开销,特别适合高并发的网络应用。
协程与 async/await
协程是用 async def 定义的函数:
import asyncio
async def greet(name):
await asyncio.sleep(1) # 模拟异步操作
return f"你好,{name}!"
async def main():
result = await greet("AITC")
print(result)
asyncio.run(main())
async def定义的函数返回一个协程对象,不会立即执行await用于等待另一个协程完成,同时把控制权交还给事件循环asyncio.run()启动事件循环并运行主协程
并发执行多个协程
asyncio.gather 可以并发执行多个协程:
async def fetch(url):
await asyncio.sleep(1)
return f"{url} 的数据"
async def main():
urls = ["url1", "url2", "url3"]
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for url, data in zip(urls, results):
print(f"{url}: {data}")
asyncio.run(main())
三个 fetch 几乎同时开始,总共只需要约 1 秒,而不是 3 秒。
创建任务
asyncio.create_task() 可以把协程包装成 Task 对象,让它在后台运行:
async def main():
task1 = asyncio.create_task(fetch("url1"))
task2 = asyncio.create_task(fetch("url2"))
# 在这里可以做其他事情...
result1 = await task1
result2 = await task2
print(result1, result2)
异步 HTTP 请求
实际项目中,通常用 aiohttp 做异步 HTTP 请求:
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/get",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, text in zip(urls, results):
print(f"{url}: {len(text)} 字节")
asyncio.run(main())
async with 用于管理异步上下文(如连接池),确保资源正确释放。
异步迭代器
如果数据源本身是异步的(如异步数据库查询、WebSocket 消息),可以用 async for:
async def ticker(delay, count):
for i in range(count):
yield i
await asyncio.sleep(delay)
async def main():
async for value in ticker(1, 5):
print(value)
asyncio.run(main())
异步生成器用 async def + yield 定义,调用时用 async for 迭代。
并发模型对比
| 模型 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| threading | I/O 密集型 | 简单直观 | GIL 限制,线程切换有开销 |
| multiprocessing | CPU 密集型 | 利用多核 | 进程间通信开销大 |
| asyncio | 高并发 I/O | 单线程,开销极小 | 代码需改写为异步风格 |
| concurrent.futures | 通用 | 接口简洁 | 底层仍是线程/进程 |
小结
- 并发让程序在等待 I/O 时不闲着,并行让多核 CPU 同时干活
- Python 的 GIL 限制了多线程的 CPU 利用率
ThreadPoolExecutor适合 I/O 密集型任务,ProcessPoolExecutor适合 CPU 密集型任务asyncio用单线程事件循环实现高并发,适合大量网络连接的场景async/await让异步代码写起来像同步代码一样直观- 选择并发模型时,先判断任务是 I/O 密集型还是 CPU 密集型