跳到主要内容

并发编程

在实际项目中,程序经常需要同时处理多个任务:同时下载多个文件、同时处理多个请求、同时执行多个计算。Python 提供了多种并发编程模型,选择合适的模型能大幅提升程序效率。

并发 vs 并行

先区分两个概念:

  • 并发(Concurrency):多个任务交替执行,看起来是同时进行的。单核 CPU 通过时间片轮转实现并发。
  • 并行(Parallelism):多个任务真正同时执行。需要多核 CPU,每个核心运行一个任务。

对于 I/O 密集型 任务(网络请求、文件读写),并发就能显著提升效率——一个任务等待 I/O 时,CPU 可以切换到另一个任务。对于 CPU 密集型 任务(大量计算),则需要并行才能真正加速。

threading 模块

Python 的 threading 模块可以创建多线程:

import threading
import time

def fetch(url):
print(f"开始下载 {url}")
time.sleep(1) # 模拟网络延迟
print(f"完成下载 {url}")

urls = ["url1", "url2", "url3"]
threads = []
for url in urls:
t = threading.Thread(target=fetch, args=(url,))
threads.append(t)
t.start()

for t in threads:
t.join()

print("全部下载完成")

GIL 的限制

Python 有一个全局解释器锁(GIL, Global Interpreter Lock),它确保同一时刻只有一个线程在执行 Python 字节码。这意味着:多线程并不能加速 CPU 密集型任务

但对于 I/O 密集型任务,GIL 会在线程等待 I/O 时释放,所以多线程仍然有效。

concurrent.futures

concurrent.futures 提供了更高级的并发接口,使用起来比 threading 更简洁。

ThreadPoolExecutor

ThreadPoolExecutor 会自动管理线程池:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch(url):
time.sleep(1)
return f"{url} 的数据"

urls = [f"url{i}" for i in range(10)]

with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
future_to_url = {executor.submit(fetch, url): url for url in urls}

# 按完成顺序获取结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url}: {data}")
except Exception as e:
print(f"{url} 出错: {e}")

max_workers 控制线程池的大小。对于 I/O 密集型任务,通常可以设置得大一些(比如 20 或 50)。

map 方法

如果不需要按完成顺序处理结果,可以用 map

with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch, urls)
for url, data in zip(urls, results):
print(f"{url}: {data}")

注意:executor.map 会按输入顺序返回结果,即使后面的任务先完成,也会等待前面的任务。

ProcessPoolExecutor

对于 CPU 密集型任务,用 ProcessPoolExecutor 创建进程池,绕过 GIL:

from concurrent.futures import ProcessPoolExecutor

def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True

numbers = [112272535095293] * 10

with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(is_prime, numbers)
print(list(results))

进程之间的数据传递需要序列化(pickle),开销比线程大。所以只有 CPU 密集型任务才值得用进程池。

asyncio

asyncio 是 Python 3.4+ 引入的异步 I/O 框架,它使用单线程事件循环来调度多个协程(coroutine),避免了线程切换的开销,特别适合高并发的网络应用。

协程与 async/await

协程是用 async def 定义的函数:

import asyncio

async def greet(name):
await asyncio.sleep(1) # 模拟异步操作
return f"你好,{name}!"

async def main():
result = await greet("AITC")
print(result)

asyncio.run(main())
  • async def 定义的函数返回一个协程对象,不会立即执行
  • await 用于等待另一个协程完成,同时把控制权交还给事件循环
  • asyncio.run() 启动事件循环并运行主协程

并发执行多个协程

asyncio.gather 可以并发执行多个协程:

async def fetch(url):
await asyncio.sleep(1)
return f"{url} 的数据"

async def main():
urls = ["url1", "url2", "url3"]
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for url, data in zip(urls, results):
print(f"{url}: {data}")

asyncio.run(main())

三个 fetch 几乎同时开始,总共只需要约 1 秒,而不是 3 秒。

创建任务

asyncio.create_task() 可以把协程包装成 Task 对象,让它在后台运行:

async def main():
task1 = asyncio.create_task(fetch("url1"))
task2 = asyncio.create_task(fetch("url2"))

# 在这里可以做其他事情...

result1 = await task1
result2 = await task2
print(result1, result2)

异步 HTTP 请求

实际项目中,通常用 aiohttp 做异步 HTTP 请求:

import aiohttp
import asyncio

async def fetch(session, url):
async with session.get(url) as response:
return await response.text()

async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/get",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, text in zip(urls, results):
print(f"{url}: {len(text)} 字节")

asyncio.run(main())

async with 用于管理异步上下文(如连接池),确保资源正确释放。

异步迭代器

如果数据源本身是异步的(如异步数据库查询、WebSocket 消息),可以用 async for

async def ticker(delay, count):
for i in range(count):
yield i
await asyncio.sleep(delay)

async def main():
async for value in ticker(1, 5):
print(value)

asyncio.run(main())

异步生成器用 async def + yield 定义,调用时用 async for 迭代。

并发模型对比

模型适用场景优点缺点
threadingI/O 密集型简单直观GIL 限制,线程切换有开销
multiprocessingCPU 密集型利用多核进程间通信开销大
asyncio高并发 I/O单线程,开销极小代码需改写为异步风格
concurrent.futures通用接口简洁底层仍是线程/进程

小结

  • 并发让程序在等待 I/O 时不闲着,并行让多核 CPU 同时干活
  • Python 的 GIL 限制了多线程的 CPU 利用率
  • ThreadPoolExecutor 适合 I/O 密集型任务,ProcessPoolExecutor 适合 CPU 密集型任务
  • asyncio 用单线程事件循环实现高并发,适合大量网络连接的场景
  • async/await 让异步代码写起来像同步代码一样直观
  • 选择并发模型时,先判断任务是 I/O 密集型还是 CPU 密集型