RabbitMQ + Python:バックグラウンドタスク処理による遅延を解消する「救世主」的ソリューション

Development tutorial - IT technology blog
Development tutorial - IT technology blog

アプリが「フリーズ」するのは、仕事を抱え込みすぎているから

ECサイトを運営していると仮定しましょう。顧客が「注文」ボタンを押すたびに、システムは膨大なタスクをこなさなければなりません:データベースへの保存、確認メールの送信、アプリへの通知、そしてPDF請求書の生成です。もし、これらのステップがすべて終わるまで、顧客を5〜10秒間も読み込み画面のまま待たせてしまったら、顧客はすぐに離脱してしまうでしょう。

私がジュニアエンジニアだった頃、初歩的なミスを犯したことがあります。ユーザーのリクエストの中で、アクティベーションメールの送信コードを直接実行してしまったのです。その結果、メールサーバーのレスポンスが遅れたり障害が発生したりすると、ウェブサイト全体が「フリーズ」してしまいました。その苦い経験から学んだ教訓は、「即座に結果を返す必要がないもの(ノンブロッキング)は、バックグラウンド(バックグラウンドタスク)に回して処理する」ということです。

ここでRabbitMQが救世主として登場します。RabbitMQは賢いバッファのような役割を果たし、システム内の各コンポーネントが互いの処理完了を待たずに通信できるようにしてくれます。

RabbitMQとは?難解な用語を解読する

RabbitMQはメッセージブローカー(Message Broker)です。郵便局をイメージしてください。送り主から手紙を受け取り、仕分けして、正しい受取人に届けます。覚えておくべき4つの重要な概念があります:

  • Producer: 発信側(例:Webアプリが「請求書を作成して」と要求を送る)。
  • Queue: メッセージを蓄積する待ち行列。他のサーバーが処理できるようになるまで、メッセージを安全に保持します。
  • Consumer: 実行側(例:PDF生成だけを専門に行うサービス)。
  • Exchange: ルーター。設定したルールに基づいて、メッセージをどのQueueに送るかを決定します。

RabbitMQとRedisのPub/Subで迷う人も多いでしょう。最大の違いは「安全性」にあります。RabbitMQは非常に堅牢です。後述するAck(確認応答)の仕組みにより、サーバーが突然クラッシュしてもメッセージが失われないことが保証されます。

一瞬で環境を構築する

OSに直接インストールして環境を汚す代わりに、Dockerを使ってスマートに構築しましょう。以下のコマンドを実行すれば、管理画面付きのRabbitMQサーバーを素早く立ち上げられます:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

ポート 5672 はコードからの接続用、15672 は管理ダッシュボード用です(http://localhost:15672 でアクセス可能。デフォルトのユーザー/パスワードは guest/guest)。次に、Python用のライブラリ pika をインストールします:

pip install pika

実践:最初のメッセージを送受信する

1. 送信側 (Producer)

このファイルを send.py という名前で保存してください。このコードは単に「hello」キューに挨拶を投げ入れるだけのものです。

import pika

# RabbitMQに接続
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# メッセージを保持するキューを作成
channel.queue_declare(queue='hello')

# キューにメッセージを送信
channel.basic_publish(exchange='', routing_key='hello', body='おはようございます!')

print(" [x] メッセージの送信に成功しました!")
connection.close()

2. 受信側 (Consumer)

receive.py を作成します。これは常に待機し、メッセージが届くとすぐに「キャッチ」して処理します。

import pika

def callback(ch, method, properties, body):
    print(f" [x] コマンドを受信: {body.decode()}")

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

# 継続的なリスニングモードを設定
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] メッセージを待機中。終了するには CTRL+C を押してください...')
channel.start_consuming()

2つのターミナルを開き、一方で受信側、もう一方で送信側を実行してテストしてください。メッセージがほぼ瞬時に転送されるのがわかります。

システムをプロフェッショナルレベルにアップグレードする (Production Ready)

実際、画像の圧縮や1,000件のマーケティングメール送信には数分かかることがあります。タスクの流入量が多すぎると、1つのConsumerでは処理しきれなくなります。システムを「不死身」にするための2つのテクニックを紹介します:

1. Message Acknowledgment (確認応答)

処理の途中でサーバーがダウンしてもデータが失われないようにしましょう。auto_ack=True をオフにします。コードの処理が完全に完了した時にのみ、RabbitMQに basic_ack を送信するようにします。処理中にConsumerがダウンした場合、RabbitMQは自動的にそのタスクを別のConsumerに割り当て直します。

2. Message Durability (永続性)

デフォルトでは、RabbitMQを再起動するとすべてのキューが消えてしまいます。キュー作成時に durable=True を宣言することで、電源トラブルなどが発生してもデータがディスク上に保持されるようにします。

以下は、堅牢なConsumerの実装例です:

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# サーバーが再起動してもこのキューは削除されません
channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    print(f" [x] 重いタスクを処理中: {body.decode()}")
    time.sleep(5) # 5秒間の処理をシミュレート
    print(" [x] 完了!")
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 前のタスクが終わるまで新しいタスクを受け取らない (Fair dispatch)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

おわりに

RabbitMQを導入することで、アプリが高速化するだけでなく、拡張性も飛躍的に向上します。タスクが溜まりすぎていると感じたら、コードを一行も変更することなく、ワーカー(Consumer)を5〜10個追加して負荷を分散させるだけで解決します。

私からの最後のアドバイス:単純な足し算のような軽い処理にMessage Queueを乱用しないでください。200ms以上かかるタスクや、外部API(SMS、メール、決済など)に依存する処理に活用しましょう。安定したシステムの構築を応援しています!

Share: