装库
pip install aiohttp
第一版:朴素并发
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url, timeout=10) as r:
return await r.text()
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, u) for u in urls]
return await asyncio.gather(*tasks)
urls = [f"https://example.com/{i}" for i in range(100)]
results = asyncio.run(main(urls))
100 个请求并发——但对方服务器可能扛不住,自己网络也可能被打满。
加限流:Semaphore
最多同时 10 个请求:
async def fetch_limited(sem, session, url):
async with sem: # 占一个名额
async with session.get(url) as r:
return await r.text()
async def main(urls):
sem = asyncio.Semaphore(10)
async with aiohttp.ClientSession() as session:
tasks = [fetch_limited(sem, session, u) for u in urls]
return await asyncio.gather(*tasks)
Semaphore(10) 让 async with sem 内部最多 10 个并发——超过就排队。
加超时
import aiohttp
timeout = aiohttp.ClientTimeout(total=10, connect=3)
async with aiohttp.ClientSession(timeout=timeout) as session:
...
整个请求 10 秒、连接 3 秒。
加重试
async def fetch_retry(sem, session, url, attempts=3):
async with sem:
for i in range(attempts):
try:
async with session.get(url) as r:
r.raise_for_status() # 4xx/5xx 抛异常
return await r.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if i == attempts - 1:
raise
await asyncio.sleep(2 ** i) # 指数退避
加进度条 + 异常隔离
不希望一个 URL 挂了所有都崩——用 gather(..., return_exceptions=True):
async def main(urls):
sem = asyncio.Semaphore(10)
async with aiohttp.ClientSession() as session:
tasks = [fetch_retry(sem, session, u) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"成功 {len(success)} / 失败 {len(failed)}")
完整版:真能用的爬虫
import asyncio
import aiohttp
from typing import Iterable
async def fetch(sem, session, url, attempts=3):
async with sem:
for i in range(attempts):
try:
async with session.get(url) as r:
r.raise_for_status()
return {"url": url, "ok": True, "data": await r.text()}
except Exception as e:
if i == attempts - 1:
return {"url": url, "ok": False, "error": str(e)}
await asyncio.sleep(2 ** i)
async def fetch_all(urls: Iterable[str], concurrency: int = 10):
sem = asyncio.Semaphore(concurrency)
timeout = aiohttp.ClientTimeout(total=15)
headers = {"User-Agent": "MyBot/1.0"}
async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:
tasks = [fetch(sem, session, u) for u in urls]
for fut in asyncio.as_completed(tasks):
yield await fut
# 用法
async def main():
urls = [f"https://httpbin.org/delay/{i % 3}" for i in range(50)]
async for result in fetch_all(urls, concurrency=10):
if result["ok"]:
print(f"✓ {result['url']}")
else:
print(f"✗ {result['url']} - {result['error']}")
asyncio.run(main())
50 个 URL、最多 10 个并发、自动重试 3 次、按完成顺序流式输出——生产级别。
httpx:另一个选择
pip install httpx
API 几乎和 aiohttp / requests 一致,同时支持同步和异步:
import httpx
async with httpx.AsyncClient() as client:
r = await client.get(url)
如果项目同步异步代码混着用,httpx 比 aiohttp 友好。
反模式:同步 sleep
async def bad():
time.sleep(1) # ❌ 卡住整个事件循环
事件循环卡住时所有协程都停——一定要用 await asyncio.sleep。
下一篇讲包结构与发布。