Celery & Redis: Pythonアプリケーションを高速化するバックグラウンドタスク処理ソリューション

Python tutorial - IT technology blog
Python tutorial - IT technology blog

なぜあなたのアプリケーションはレスポンスが遅いのか?

レスポンスの遅延は、ユーザーエクスペリエンスにとって「死の接吻」です。Amazonの統計によると、わずか100ms의 遅延でも売上が1%減少する可能性があります。顧客が「注文する」をクリックした際、システムはデータベースの保存、PDFの作成、メール送信、Telegramへの通知など、一連の処理を行う必要があります。これらを逐次実行すると、ユーザーはページの読み込み完了まで5〜10秒待たされることになります。

問題は**同期(Synchronous)**メカニズムにあります。Pythonはコードを1行ずつ処理するため、メール送信のような重いI/Oタスクに遭遇するとブロックされてしまいます。これを解決するには、**バックグラウンドタスク(Background Tasks)**が必要です。

**Celery**をプロのバリスタ、**Redis**を注文票の束と考えてみてください。自分でコーヒーを淹れてから会計をするのではなく、注文票をスタッフに渡して、すぐに次の顧客の対応を続けるようなものです。これにより、システムは非常にスムーズに動作します。

環境のクイックセットアップ

CeleryライブラリとメッセージブローカーであるRedisの2つのコンポーネントが必要です。Redisは、処理待ちのタスクを保存・転送する役割を担います。

Dockerを使用してRedisを最も速くデプロイする方法:

docker run -d -p 6379:6379 redis

Ubuntuを使用している場合は、aptコマンドを使用します:

sudo apt install redis-server

必要なPythonライブラリをインストールします:

pip install celery redis

CeleryとRedisの設定

バックグラウンドで実行するタスクを定義するために tasks.py ファイルを作成します。

from celery import Celery
import time

# Celeryの初期化: Redisがブローカーとバックエンドの両方の役割を果たす
app = Celery('my_tasks', 
             broker='redis://localhost:6379/0', 
             backend='redis://localhost:6379/0')

@app.task
def send_email_task(email_address):
    print(f"{email_address} へのメール送信を開始します...")
    time.sleep(5) # 実際の遅延をシミュレート
    return f"{email_address} への送信に成功しました"

上記の構成では、brokerは命令を受け取る場所であり、backendは実行結果を保存する場所です。アプリケーションをフリーズさせずにこのタスクを呼び出すには、通常の関数呼び出しではなく、.delay() メソッドを使用します。

main.py ファイルでテストします:

from tasks import send_email_task

print("1. リクエストを受信しました。")
send_email_task.delay("[email protected]")
print("2. キューに追加されました。即座にレスポンスを返します!")

python main.py を実行すると、結果がすぐに表示されます。メール送信タスクはこの時点でRedis内に安全に格納されています。

ヒント: Celeryへの入力データを処理する際、私はよくメール形式や複雑な文字列をチェックするために regex tester を使用します。これにより、キューに送られるデータが常に正確であることを保証し、単純なフォーマットエラーでワーカーが停止するのを防ぐことができます。

ワーカーの運用と監視

Redisにデータが入ったので、次はCeleryという「スタッフ」を呼び起こす番です。ターミナルを開き、次のコマンドを実行します:

celery -A tasks worker --loglevel=info

ワーカーはRedisに接続し、保留中のタスクを処理します。数秒後、ログに Task succeeded in 5.0s と表示されるのが確認できるはずです。

Flowerによる数千件のタスク管理

システムが1分間に数千のタスクを処理する場合、ターミナルのログを読むのは悪夢です。Flowerは、直感的なWebインターフェースを提供する救世主的なソリューションです。

pip install flower
celery -A tasks flower

http://localhost:5555 にアクセスして、リアルタイムのチャートを確認します。どのタスクが失敗したか、どのタスクが滞っているかを正確に把握でき、タイムリーにリソースをスケールアップすることが可能です。

Celeryを使用する際の3つの黄金律

  • **不変性(Idempotency)の設計:** タスクを複数回実行してもデータエラーが発生しないようにします。次のアクションを実行する前に、必ず注文ステータスなどを確認してください。
  • **IDの転送を優先する:** 重いユーザーオブジェクト全体をRedis経由で送信しないでください。user_id のみを送信し、ワーカーが必要に応じてデータベースから最新データを再取得するようにします。
  • **リトライ設定:** SendGridのようなサードパーティAPIは、一時的なエラーが発生することがよくあります。autoretry_for を使用して、一定時間後にCeleryが自動的に再試行するように設定しましょう。

CeleryとRedisを導入することで、メインの処理フローとサブの処理フローを分離できます。アプリケーションのレスポンスが向上し、負荷耐性が高まり、ユーザーにプロフェッショナルな体験を提供できるようになります。

Share: