Vấn đề: Khi RabbitMQ quá nặng nề cho một dự án vừa và nhỏ
Hồi làm Notification Service cho một startup, hệ thống của mình chỉ cần xử lý khoảng 50.000 event mỗi ngày. Team lúc đó quyết định dùng RabbitMQ vì nghe danh “chuẩn công nghiệp”. Kết quả là gì? Một con VPS 2GB RAM chạy Erlang ngốn gần 500MB chỉ để duy trì service khi chưa có tải. Việc cấu hình Exchange, Queue, Binding cũng trở nên quá rườm rà cho một nhu cầu đơn giản là đẩy và nhận tin nhắn.
Thực tế, dự án vốn đã dùng Redis để Cache. Việc phải nuôi thêm một cụm RabbitMQ khiến chi phí vận hành tăng vọt mà không mang lại nhiều giá trị thặng dư. Mình bắt đầu tự hỏi: Liệu có cách nào tận dụng luôn Redis để làm Message Queue (MQ) chuyên nghiệp không? Liệu nó có đảm bảo tin nhắn không bị mất và chia tải được cho nhiều worker?
Phân tích: Tại sao Redis Pub/Sub hay List không gánh nổi MQ thực thụ?
Trước bản 5.0, anh em mình thường phải “chế cháo” MQ qua hai cách quen thuộc nhưng đầy rủi ro:
- Redis List (LPUSH/BRPOP): Nó giống như một cái ống nước, đẩy vào một đầu và rút ra đầu kia. Cách này đơn giản nhưng thiếu tính năng “Fan-out” (một tin nhắn gửi cho nhiều người nhận). Nguy hiểm nhất là nếu worker lấy tin nhắn ra rồi bị crash, tin nhắn đó coi như bốc hơi hoàn toàn.
- Redis Pub/Sub: Cơ chế này thuần real-time theo kiểu “bắn và quên”. Nếu subscriber mất kết nối đúng lúc publisher đang gửi, tin nhắn sẽ bay vào hư không. Nó hoàn toàn không có lịch sử (history) hay cơ chế xác nhận (ACK).
Để lấp đầy khoảng trống này, Redis Streams đã ra đời. Nó kết hợp sự bền bỉ của List với khả năng phân phối linh hoạt. Bạn sẽ sở hữu tính năng Consumer Groups mạnh mẽ như Kafka nhưng với mức tiêu thụ tài nguyên cực thấp.
Các phương án triển khai MQ trong hệ sinh thái Redis
Tùy vào độ phức tạp của bài toán, bạn có thể chọn một trong ba hướng đi sau:
- Sử dụng List (Simple Queue): Dùng cho các background job đơn giản, không sợ mất dữ liệu.
- Sử dụng Pub/Sub: Phù hợp cho app chat hoặc thông báo đẩy mà người dùng offline thì thôi, không cần xem lại.
- Sử dụng Redis Streams: Đây là lựa chọn sáng giá nhất cho Microservices hoặc hệ thống Event-driven cần đảm bảo tin nhắn phải được xử lý ít nhất một lần (At-least-once delivery).
Giải pháp thực chiến: Xây dựng Message Queue với Redis Streams
Streams hoạt động như một file log chỉ cho phép ghi đè ở cuối. Nó lưu trữ các cặp key-value kèm theo ID dựa trên timestamp, giúp việc truy vết cực kỳ dễ dàng.
Thao tác cơ bản: Đẩy dữ liệu vào Stream
Thay vì LPUSH, chúng ta dùng lệnh XADD. Ký tự * là câu lệnh yêu cầu Redis tự sinh ID duy nhất.
# Thêm một đơn hàng vào stream 'orders'
redis-cli XADD orders * user_id 1001 item "iPhone 15" status "pending"
Redis sẽ trả về một ID như 1712745600000-0. Bạn có thể dùng ID này để kiểm tra trạng thái tin nhắn bất cứ lúc nào.
Chia tải thông minh với Consumer Groups
Đây là điểm “ăn tiền” nhất. Bạn có thể gom các worker vào một nhóm để chia nhau xử lý backlog. Nếu worker A đang bận, worker B sẽ tự động lấy tin nhắn tiếp theo.
# Tạo nhóm 'order_processors' cho stream 'orders'
# '$' nghĩa là chỉ nhận tin nhắn mới phát sinh từ bây giờ
redis-cli XGROUP CREATE orders order_processors $
Sau đó, worker sẽ đọc tin nhắn bằng lệnh XREADGROUP:
# Worker 'worker_1' lấy 1 tin nhắn mới chưa ai chạm vào (ID là '>')
redis-cli XREADGROUP GROUP order_processors worker_1 COUNT 1 STREAMS orders >
Cơ chế ACK và XPENDING: Không bao giờ mất dấu tin nhắn
Trong thực tế, worker rất dễ bị “ngỏm” khi đang gọi API bên thứ ba. Redis Streams giải quyết việc này qua danh sách PEL (Pending Entries List). Khi bạn đọc một tin nhắn, nó sẽ tạm nằm trong PEL. Chỉ khi bạn gửi lệnh XACK, Redis mới xác nhận là đã xong.
# Xác nhận xử lý thành công
redis-cli XACK orders order_processors 1712745600000-0
Nếu tin nhắn nằm trong PEL quá lâu, bạn hãy dùng XCLAIM để gán nó cho worker khác xử lý lại. Đây là cách mình xử lý lỗi logic cực kỳ nhàn mà không cần tool monitor ngoài.
Ví dụ code Python thực tế
Dưới đây là đoạn code mẫu mình thường dùng để triển khai một worker cơ bản:
import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
stream_name = 'orders'
group_name = 'order_processors'
consumer_name = 'worker_v1'
# Tạo group và stream nếu chưa có
try:
r.xgroup_create(stream_name, group_name, id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass
while True:
# Đọc tin nhắn mới chưa được ACK
messages = r.xreadgroup(group_name, consumer_name, {stream_name: '>'}, count=1)
for stream, content in messages:
for message_id, data in content:
print(f"Đang xử lý đơn: {data['item']} cho khách {data['user_id']}")
# Giả lập logic xử lý
time.sleep(0.5)
# Xác nhận hoàn tất
r.xack(stream_name, group_name, message_id)
print(f"Xong đơn: {message_id}")
Kinh nghiệm xương máu: Lưu ý để Redis không ‘nuốt’ RAM
Dù rất mạnh, nhưng nếu dùng sai cách, Redis sẽ sớm làm server của bạn “bay màu”. Đừng quên các quy tắc sau:
- Luôn giới hạn độ dài Stream: Redis lưu dữ liệu trên RAM. Đừng
XADDvô tận. Hãy dùngMAXLEN ~ 10000để tự động xóa các bản ghi cũ khi đạt giới hạn 10.000 tin. - Monitor bằng XINFO: Hãy thường xuyên chạy
XINFO GROUPS orders. Nếu số lượng Pending tăng đột biến, đó là tín hiệu bạn cần scale thêm worker ngay lập tức. - Sử dụng Redis Sentinel: MQ là trái tim của hệ thống. Đừng để nó chạy đơn lẻ. Hãy setup Sentinel để tự động failover nếu master die, tránh việc tắc nghẽn toàn hệ thống.
- Tách biệt Database: Đừng dùng chung DB 0 cho cả Cache và MQ. Việc tách biệt tài nguyên giúp tránh tình trạng Cache phình to làm chết Message Queue.
Nói tóm lại, Redis Streams là vũ khí hạng nặng nhưng lại cực kỳ thanh thoát. Với các dự án cần tốc độ cao và tiết kiệm tài nguyên, mình luôn ưu tiên nó trước khi nghĩ tới những “gã khổng lồ” như Kafka hay RabbitMQ.

