This implementation migrates the Redis message queue system from Lists/Sorted Sets to Redis Streams with proper message acknowledgements and consumer groups.
Before (Lists/Sorted Sets):
- Immediate messages:
LPUSHto Redis Lists - Delayed messages:
ZADDto Sorted Sets with timestamp scores
After (Redis Streams):
- Immediate messages:
XADDwith auto-generated IDs - Delayed messages:
XADDwith timestamp-based IDs (format:{timestamp_ms}-0)
Example:
# Immediate message
await redis_push_async(message, delay_seconds=0)
# Delayed message (5 minutes)
await redis_push_async(message, delay_seconds=300)Before:
- Used
BRPOPto block and pop from Lists - No acknowledgements - messages lost if consumer crashes
- Manual load balancing with multiple consumers
After:
- Uses
XREADGROUPwith consumer groups - Automatic load balancing across consumers
- Messages acknowledged with
XACKafter successful processing - Pending Entry List (PEL) for failed message retries
Key Features:
- Consumer Groups: Automatic load balancing
- Acknowledgements: Messages only removed after
XACK - Pending Messages: Failed messages stay in PEL for retry
- Timestamp Validation: Skips future-dated messages automatically
Before:
- Polled Sorted Sets with
ZRANGEBYSCORE - Moved ready messages back to Lists
After:
- Monitors Redis Streams for ready delayed messages
- Uses
XRANGEto find messages with timestamps <= current time - Messages are automatically consumed by main consumer when ready
New utility module for managing consumer groups:
ensure_consumer_group(): Creates consumer groups on startupinitialize_consumer_groups(): Sets up groups for all queuesget_pending_messages(): Retrieves unacknowledged messagesclaim_pending_messages(): Claims idle messages from failed consumersget_consumer_group_info(): Gets group statistics
-
Producer calls
redis_push_async(message, delay_seconds=0)- Message added to stream:
{queue_name}:stream - For delayed messages, ID format:
{future_timestamp_ms}-0
- Message added to stream:
-
Consumer reads with
XREADGROUP- Reads from consumer group:
main-group - Each consumer has unique name:
{hostname}-{pid} - Messages distributed across consumers automatically
- Reads from consumer group:
-
Processing
- Consumer validates message timestamp (skips future messages)
- Processes message
- On success:
XACKremoves from PEL - On failure: Message stays in PEL for retry
-
Pending Messages
- Background task checks PEL every 60 seconds
- Claims messages idle > 60 seconds
- Retries processing
Stream Name: {queue_name}:stream
Consumer Group: main-group
Consumer Names: {hostname}-{pid}
Message Format:
{
"data": "{json_encoded_message}",
...
}
Message ID Format:
- Immediate: Auto-generated (e.g., "1234567890123-0")
- Delayed: Timestamp-based (e.g., "1234567895000-0")
- Messages persist until acknowledged
- No message loss on consumer crashes
- Automatic retry via PEL
- Consumer groups automatically distribute messages
- No manual queue assignment needed
- Easy horizontal scaling
- Track pending messages per consumer
- Monitor consumer group health
- Better debugging capabilities
- More efficient than Lists for high throughput
- Better memory usage than Sorted Sets
- Supports batch operations
The old redis_lpush() function still works but now uses Streams internally. However, you should migrate to redis_push_async() for async operations.
Consumer groups are automatically created on first startup. If you need to reset:
# Delete and recreate consumer group
await redis_client.xgroup_destroy(stream_name, group_name)
await ensure_consumer_group(stream_name, group_name)Check consumer group status:
from app.services.stream_consumer_groups import get_consumer_group_info
info = await get_consumer_group_info("notifications:stream", "main-group")
print(f"Pending: {info['pending']}, Consumers: {info['consumers']}")No configuration changes needed. The system uses existing QUEUE_NAMES setting.
- Test Immediate Messages:
message = {
"queue_name": "notifications",
"operation": "send_email",
"data": {...}
}
await redis_push_async(message)- Test Delayed Messages:
await redis_push_async(message, delay_seconds=60)- Check Pending Messages:
from app.services.stream_consumer_groups import get_pending_messages
pending = await get_pending_messages("notifications:stream", "main-group")-
Check consumer group exists:
await ensure_consumer_group("queue:stream", "main-group")
-
Check for pending messages:
pending = await get_pending_messages("queue:stream", "main-group")
-
Claim and retry pending messages:
claimed = await claim_pending_messages("queue:stream", "main-group", "consumer-1")
Streams can grow large. Consider:
- Setting
MAXLENwhen adding messages (not implemented yet) - Periodic cleanup of old messages
- Monitoring stream length with
XLEN
- Stream Trimming: Auto-trim old messages with
XADD ... MAXLEN - Dead Letter Queue: Move failed messages after max retries
- Metrics: Add Prometheus metrics for monitoring
- Stream Replication: Support for Redis Cluster