装库

pip install aiohttp

第一版:朴素并发

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url, timeout=10) as r:
        return await r.text()

async def main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, u) for u in urls]
        return await asyncio.gather(*tasks)

urls = [f"https://example.com/{i}" for i in range(100)]
results = asyncio.run(main(urls))

100 个请求并发——但对方服务器可能扛不住,自己网络也可能被打满。

加限流:Semaphore

最多同时 10 个请求:

async def fetch_limited(sem, session, url):
    async with sem:                 # 占一个名额
        async with session.get(url) as r:
            return await r.text()

async def main(urls):
    sem = asyncio.Semaphore(10)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_limited(sem, session, u) for u in urls]
        return await asyncio.gather(*tasks)

Semaphore(10)async with sem 内部最多 10 个并发——超过就排队。

加超时

import aiohttp

timeout = aiohttp.ClientTimeout(total=10, connect=3)
async with aiohttp.ClientSession(timeout=timeout) as session:
    ...

整个请求 10 秒、连接 3 秒。

加重试

async def fetch_retry(sem, session, url, attempts=3):
    async with sem:
        for i in range(attempts):
            try:
                async with session.get(url) as r:
                    r.raise_for_status()        # 4xx/5xx 抛异常
                    return await r.text()
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if i == attempts - 1:
                    raise
                await asyncio.sleep(2 ** i)     # 指数退避

加进度条 + 异常隔离

不希望一个 URL 挂了所有都崩——用 gather(..., return_exceptions=True)

async def main(urls):
    sem = asyncio.Semaphore(10)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_retry(sem, session, u) for u in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    success = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]
    print(f"成功 {len(success)} / 失败 {len(failed)}")

完整版:真能用的爬虫

import asyncio
import aiohttp
from typing import Iterable

async def fetch(sem, session, url, attempts=3):
    async with sem:
        for i in range(attempts):
            try:
                async with session.get(url) as r:
                    r.raise_for_status()
                    return {"url": url, "ok": True, "data": await r.text()}
            except Exception as e:
                if i == attempts - 1:
                    return {"url": url, "ok": False, "error": str(e)}
                await asyncio.sleep(2 ** i)

async def fetch_all(urls: Iterable[str], concurrency: int = 10):
    sem = asyncio.Semaphore(concurrency)
    timeout = aiohttp.ClientTimeout(total=15)
    headers = {"User-Agent": "MyBot/1.0"}

    async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:
        tasks = [fetch(sem, session, u) for u in urls]
        for fut in asyncio.as_completed(tasks):
            yield await fut

# 用法
async def main():
    urls = [f"https://httpbin.org/delay/{i % 3}" for i in range(50)]
    async for result in fetch_all(urls, concurrency=10):
        if result["ok"]:
            print(f"✓ {result['url']}")
        else:
            print(f"✗ {result['url']} - {result['error']}")

asyncio.run(main())

50 个 URL、最多 10 个并发、自动重试 3 次、按完成顺序流式输出——生产级别

httpx:另一个选择

pip install httpx

API 几乎和 aiohttp / requests 一致,同时支持同步和异步

import httpx

async with httpx.AsyncClient() as client:
    r = await client.get(url)

如果项目同步异步代码混着用,httpx 比 aiohttp 友好。

反模式:同步 sleep

async def bad():
    time.sleep(1)        # ❌ 卡住整个事件循环

事件循环卡住时所有协程都停——一定要用 await asyncio.sleep

下一篇讲包结构与发布