aiohttp:PythonでデータスクレイピングとAPI呼び出しを10倍速くする秘訣

Python tutorial - IT technology blog
Python tutorial - IT technology blog

なぜ大規模なタスクでRequestsの使用をやめるべきなのか?

Pythonで開発をしているなら、APIを呼び出す際にrequestsは最も身近な存在でしょう。シンプルで非常に安定しています。しかし、ECサイトの5,000件の製品データをスクレイピングしたり、10,000件のリンクを同時にチェックしたりといった膨大なワークロードを処理する必要がある場合、問題が発生します。

問題は、requests同期(synchronous)メカニズムで動作することにあります。リクエストを送信すると、サーバーからのレスポンスを待つ間、プログラム全体が「フリーズ」してしまいます。例えば1リクエストに0.5秒かかるとすると、1,000ページを処理するのに8分以上かかります。本番環境(production)において、これは許容できないリソースの無駄です。

実際のプロジェクトでニュース収集システムを最適化した際、私は完全にaiohttpへ移行しました。結果は驚くべきものでした。実行時間が15分から40秒未満に短縮されたのです。asyncioの力を活用することで、aiohttpは前のリクエストが終わるのを待たずに大量のリクエストを送信することを可能にします。

5分で始めるクイックスタート

まずは、ライブラリと高速なDNSリゾルバをインストールしましょう:

pip install aiohttp aiodns

以下は、非同期スタイルでデータを取得する基本的な関数の実装方法です:

import aiohttp
import asyncio

async def fetch_status(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            status = response.status
            print(f"ステータス: {status} URL: {url}")
            return await response.text()

if __name__ == "__main__":
    # 非同期関数をイベントループで実行
    asyncio.run(fetch_status('https://google.com'))

注意:asyncawaitのキーワードのペアを使用する必要があります。これはPythonに対して「ネットワークのレスポンスを待っている間、何もしないのではなく、他の仕事をしてください!」と伝える合図です。

秘密はClientSessionにあり

初心者の多くは、関数を呼び出すたびに新しいClientSessionを作成するという間違いを犯しがばちです。実際、これはメモリを溢れさせ、「ソケットリーク(socket leak)」を引き起こす最短の方法です。

ClientSessionは、全体を通した一本の水道管のようなものだと考えてください。一度開いたら、すべてのリクエストで共有し、コネクションプーリング(connection pooling)を活用すべきです。確立済みのTCP接続を再利用することで、ハンドシェイク(handshake)を繰り返す必要がなくなり、処理時間を最大30%節約できます。

複数のAPIを同時に処理する方法

ここからがaiohttpの真骨頂です。タスクをリストにまとめ、asyncio.gatherを使って同時に実行します。

import aiohttp
import asyncio
import time

async def get_data(session, url):
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    urls = ['https://jsonplaceholder.typicode.com/posts/1'] * 50
    
    async with aiohttp.ClientSession() as session:
        tasks = [get_data(session, url) for url in urls]
        # 50件のリクエストを同時に開始
        results = await asyncio.gather(*tasks)
        print(f"{len(results)} 件の結果を処理しました")

start = time.perf_counter()
asyncio.run(main())
print(f"完了までの時間: {time.perf_counter() - start:.2f} 秒")

IPブロックを避けるためのレート制限

1秒間に大量のリクエストを送信すると、サーバーからDDoS攻撃とみなされる可能性があります。IPをブロックされないように、私は通常asyncio.Semaphoreを使用して、並列実行されるリクエスト数を制限します。

セマフォ(Semaphore)は交通警察のようなもので、一度にトンネルを通過できる車両の数を制限します。

# 最大10件の同時リクエストを許可
limit = asyncio.Semaphore(10)

async def safe_fetch(session, url):
    async with limit:
        async with session.get(url) as response:
            return await response.read()

データのパース過程で複雑な正規表現(Regex)に苦戦しているなら、toolcraft.appの正規表現テストツールを試してみてください。ブラウザ上でパターンを素早くテストできるため、コードを修正してスクリプトを何度も再実行する手間が省けます。

プロフェッショナルなエラー処理とタイムアウト

ネットワークの不安定さは日常茶飯事です。タイムアウトを設定しないと、スクリプトが永久にハングしてしまう可能性があります。

# 全体のタイムアウトを10秒に設定
timeout = aiohttp.ClientTimeout(total=10) 
async with aiohttp.ClientSession(timeout=timeout) as session:
    try:
        async with session.get(url) as resp:
            data = await resp.json()
    except asyncio.TimeoutError:
        print("エラー:サーバーのレスポンスが遅すぎます!")
    except aiohttp.ClientError as e:
        print(f"ネットワーク接続エラー: {e}")

小さいけれど価値のある注意点

長年大規模なスクレイピングシステムを運用してきた経験から、4つの重要なポイントをまとめました:

  1. コンテキストマネージャを常に使用する: エラーが発生してもリソースが確実に解放されるよう、常にasync withを使用してください。
  2. ujsonへのアップグレード: 重いJSONファイルをパースする場合は、ujsonをインストールしましょう。標準ライブラリより3〜5倍速く、CPU負荷を大幅に軽減できます。
  3. DNSキャッシュの活用: aiodnsをインストールするとドメイン解決が速くなります。数百もの異なるドメインを呼び出す場合に非常に有効です。
  4. 引き際を知る: スクリプトが1〜2個のシンプルなAPIを呼び出すだけなら、無理せずrequestsを使いましょう。高いパフォーマンスが本当に必要でない限り、問題を複雑にする必要はありません。

ネットワークの不安定さは日常茶飯事です。タイムアウトを設定しないと、スクリプトが永久にハングしてしまう可能性があります。

最初は非同期の考え方に慣れるまで少し混乱するかもしれません。しかし、一度aiohttpをマスターすれば、大規模なデータを処理するための非常に強力な武器を手に入れることができます。

Share: