課題:中小規模プロジェクトにおけるRabbitMQのオーバーヘッド
かつてスタートアップで通知サービスを開発していた際、システムが処理する必要があるのは1日あたり約5万イベント程度でした。当時のチームは「業界標準」という評判を聞き、RabbitMQの採用を決定しました。結果はどうだったでしょうか? 2GBのRAMを搭載したVPSで、負荷がない状態でもErlangがサービス維持のためだけに約500MBを消費していました。また、単純なメッセージの送受信というニーズに対して、Exchange, Queue, Bindingの設定はあまりにも煩雑すぎました。
実際、そのプロジェクトでは既にキャッシュとしてRedisを使用していました。RabbitMQクラスターを別途運用することは、運用コストを急増させる一方で、それに見合う付加価値はほとんどありませんでした。そこで私は考え始めました。「Redisをプロフェッショナルなメッセージキュー(MQ)として活用する方法はないだろうか? メッセージの紛失を防ぎ、複数のワーカーに負荷を分散させることは可能なのか?」と。
分析:なぜRedisのPub/SubやListでは「本格的なMQ」が務まらないのか?
バージョン5.0が登場する前、私たちはよく知られた2つの方法でMQを「模倣」していましたが、それらには大きなリスクが伴いました。
- Redis List (LPUSH/BRPOP): これは水道管のようなもので、一方の端から押し込み、もう一方から取り出します。シンプルですが、「ファンアウト」(1つのメッセージを複数の受信者に送る)機能がありません。最も危険なのは、ワーカーがメッセージを取り出した直後にクラッシュした場合、そのメッセージは完全に消失してしまう点です。
- Redis Pub/Sub: このメカニズムは純粋な「投げっぱなし(Fire and Forget)」のリアルタイム方式です。パブリッシャーが送信している最中にサブスクライバーの接続が切れると、メッセージは虚空へと消えてしまいます。履歴(History)の保持や確認応答(ACK)のメカニズムも一切ありません。
このギャップを埋めるために誕生したのが「Redis Streams」です。Listの永続性と柔軟な配信能力を兼ね備えています. Kafkaのような強力なコンシューマーグループ(Consumer Groups)機能を、極めて低いリソース消費で利用できるようになります。
RedisエコシステムにおけるMQの実装オプション
課題の複雑さに応じて、以下の3つのアプローチから選択できます。
- List(シンプルキュー)の使用: データの紛失を許容できる、単純なバックグラウンドジョブに適しています。
- Pub/Subの使用: チャットアプリや、ユーザーがオフラインであれば再確認の必要がないプッシュ通知に適しています。
- Redis Streamsの使用: マイクロサービスや、「少なくとも1回の配信(At-least-once delivery)」を保証する必要があるイベント駆動型システムにとって、最も優れた選択肢です。
実践的な解決策:Redis Streamsによるメッセージキューの構築
Streamsは、末尾への追記のみを許可するログファイルのように動作します。タイムスタンプに基づいたIDと共にキー・バリューのペアを保存するため、追跡(トレーサビリティ)が非常に容易です。
基本操作:ストリームへのデータ投入
LPUSHの代わりに、XADDコマンドを使用します。*記号は、Redisに一意のIDを自動生成させるための指示です。
# ストリーム 'orders' に注文を追加
redis-cli XADD orders * user_id 1001 item "iPhone 15" status "pending"
Redisは 1712745600000-0 のようなIDを返します。このIDを使用して、いつでもメッセージの状態を確認できます。
コンシューマーグループによるインテリジェントな負荷分散
これこそが最大の特徴です。ワーカーをグループ化してバックログを分担処理できます。ワーカーAがビジーな場合、ワーカーBが自動的に次のメッセージを取得します。
# ストリーム 'orders' に対してグループ 'order_processors' を作成
# '$' は、今この瞬間から発生する新しいメッセージのみを受信することを意味します
redis-cli XGROUP CREATE orders order_processors $
その後、ワーカーは XREADGROUP コマンドを使用してメッセージを読み取ります。
# ワーカー 'worker_1' が、まだ誰も手をつけていない新しいメッセージを1つ取得(IDは '>')
redis-cli XREADGROUP GROUP order_processors worker_1 COUNT 1 STREAMS orders >
ACKメカニズムとXPENDING:メッセージの行方を見失わないために
実運用では、サードパーティのAPIを呼び出している最中にワーカーが「停止」してしまうことがよくあります。Redis StreamsはこれをPEL (Pending Entries List)によって解決します。メッセージを読み取ると、それは一時的にPELに保持されます。XACKコマンドを送信して初めて、Redisは処理完了を認識します。
# 処理成功を確認(確定)
redis-cli XACK orders order_processors 1712745600000-0
メッセージが長時間PELに留まっている場合は、XCLAIMを使用して別のワーカーに再割り当てし、処理をやり直させます。これにより、外部の監視ツールを使わずにロジックエラーを非常にスマートに処理できます。
Pythonによる実装例
以下は、基本的なワーカーを実装するために私がよく使用するサンプルコードです。
import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
stream_name = 'orders'
group_name = 'order_processors'
consumer_name = 'worker_v1'
# グループとストリームが存在しない場合は作成
try:
r.xgroup_create(stream_name, group_name, id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass
while True:
# まだACKされていない新しいメッセージを読み取る
messages = r.xreadgroup(group_name, consumer_name, {stream_name: '>'}, count=1)
for stream, content in messages:
for message_id, data in content:
print(f"注文を処理中: 顧客 {data['user_id']} の {data['item']}")
# 処理ロジックのシミュレーション
time.sleep(0.5)
# 完了を確認(ACK送信)
r.xack(stream_name, group_name, message_id)
print(f"処理完了: {message_id}")
現場の教訓:Redisにメモリを「食いつぶされない」ための注意点
非常に強力ですが、使い方を誤るとRedisはすぐにサーバーをダウンさせてしまいます。以下のルールを忘れないでください:
- 常にストリームの長さを制限する: RedisはデータをRAM上に保持します。際限なく
XADDしないでください。MAXLEN ~ 10000を使用して、1万件に達した際に古いレコードを自動的に削除するようにしましょう。 - XINFOで監視する: 定期的に
XINFO GROUPS ordersを実行してください。Pendingの数が急増している場合は、すぐにワーカーをスケールアウトする必要があるサインです。 - Redis Sentinelを使用する: MQはシステムの心臓部です。単一障害点にならないよう、Sentinelをセットアップしてマスターがダウンした場合の自動フェイルオーバーを有効にし、システム全体の停滞を防ぎましょう。
- データベースを分離する: キャッシュとMQの両方に同じDB 0を使用しないでください。リソースを分離することで、キャッシュの肥大化がメッセージキューを道連れにする事態を避けられます。
要するに、Redis Streamsは強力でありながら非常に洗練された武器です。高い処理速度とリソースの節約が求められるプロジェクトにおいて、私はKafkaやRabbitMQのような「巨人」を検討する前に、常にこれを優先して選択しています。

