
rabbitmq-master
Ultimate RabbitMQ expertise skill for production-grade message broker architecture, implementation, and operations. Top 0.01% knowledge covering: (1) Advanced messaging patterns - Dead Letter Exchanges, Delayed Messages, Priority Queues, Consistent Hash Exchange, Sharding, (2) High Availability - Clustering, Quorum Queues, Stream Queues, Federation, Shovel, (3) Performance Engineering - prefetch tuning, connection pooling, batch publishing, memory optimization, flow control, (4) Security - TLS/mTLS, OAuth2, LDAP, certificate rotation, (5) Monitoring - Prometheus metrics, custom health checks, anomaly detection, (6) Troubleshooting - memory alarms, network partitions, queue backlogs, consumer starvation, (7) Multi-tenancy - vhost design, resource limits, isolation patterns, (8) Event-driven architectures - CQRS, Event Sourcing, Saga patterns with RabbitMQ. Use when: building messaging systems, debugging RabbitMQ issues, optimizing performance, designing HA architectures, implementing advanced patterns, production hardening, capacity planning, migration strategies.
|
RabbitMQ Master Skill
Expert-level RabbitMQ knowledge for building bulletproof messaging systems.
Quick Reference
Connection Best Practices
# WRONG - Connection per message (kills performance)
def send_bad(msg):
conn = pika.BlockingConnection(params) # 7-way TCP handshake + AMQP handshake
ch = conn.channel()
ch.basic_publish(...)
conn.close()
# CORRECT - Connection pooling with heartbeat
import pika
from pika import ConnectionParameters, PlainCredentials
params = ConnectionParameters(
host='rabbitmq.prod',
port=5672,
credentials=PlainCredentials('user', 'pass'),
heartbeat=60, # Detect dead connections
blocked_connection_timeout=300, # Handle flow control
connection_attempts=3,
retry_delay=5,
socket_timeout=10,
stack_timeout=15,
# CRITICAL: TCP keepalive untuk cloud/NAT environments
tcp_options={'TCP_KEEPIDLE': 60, 'TCP_KEEPINTVL': 10, 'TCP_KEEPCNT': 3}
)
# Use connection pool - see scripts/connection_pool.py
Channel Best Practices
# Channels are NOT thread-safe - use 1 channel per thread
# Channels are cheap - create many, but not per message
# OPTIMAL: Dedicated channels per purpose
publish_channel = conn.channel()
publish_channel.confirm_delivery() # Enable publisher confirms
consume_channel = conn.channel()
consume_channel.basic_qos(prefetch_count=50) # Tuned prefetch
Core Patterns
1. Reliable Publishing (Publisher Confirms)
# Synchronous confirms (simple, slower)
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='orders',
routing_key='new',
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
message_id=str(uuid4()), # Idempotency key
timestamp=int(time.time()),
headers={'retry_count': 0}
),
mandatory=True # Return if unroutable
)
except pika.exceptions.UnroutableError:
handle_unroutable()
except pika.exceptions.NackError:
handle_nack()
# Asynchronous confirms (complex, 10x faster) - see scripts/async_publisher.py
2. Reliable Consuming
def callback(ch, method, properties, body):
try:
# ALWAYS process idempotently using message_id
if is_duplicate(properties.message_id):
ch.basic_ack(method.delivery_tag)
return
process_message(body)
mark_processed(properties.message_id)
ch.basic_ack(method.delivery_tag)
except RecoverableError as e:
# Requeue with exponential backoff via DLX
retry_count = (properties.headers or {}).get('retry_count', 0)
if retry_count < MAX_RETRIES:
republish_with_delay(ch, body, retry_count + 1)
ch.basic_ack(method.delivery_tag) # Ack original
else:
ch.basic_nack(method.delivery_tag, requeue=False) # To DLQ
except FatalError:
# Permanent failure - dead letter immediately
ch.basic_nack(method.delivery_tag, requeue=False)
channel.basic_qos(prefetch_count=50) # CRITICAL - tune this!
channel.basic_consume(queue='orders', on_message_callback=callback)
3. Dead Letter Exchange Pattern
# DLX captures: rejected, expired, queue-full messages
channel.exchange_declare('dlx.exchange', 'direct', durable=True)
channel.queue_declare('dlq.orders', durable=True)
channel.queue_bind('dlq.orders', 'dlx.exchange', 'orders')
# Main queue with DLX
channel.queue_declare(
'orders',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx.exchange',
'x-dead-letter-routing-key': 'orders',
'x-message-ttl': 86400000, # 24h max age
'x-max-length': 1000000, # Max 1M messages
'x-overflow': 'reject-publish-dlx' # DLX on overflow
}
)
4. Delayed/Scheduled Messages
# Method 1: Plugin (rabbitmq_delayed_message_exchange)
channel.exchange_declare(
'delayed.exchange',
'x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
channel.basic_publish(
exchange='delayed.exchange',
routing_key='scheduled',
body=payload,
properties=pika.BasicProperties(
headers={'x-delay': 60000} # 60 seconds delay
)
)
# Method 2: TTL + DLX chain (no plugin needed) - see references/patterns.md
5. Priority Queues
# CAUTION: Priority queues have overhead, use sparingly
channel.queue_declare(
'priority.orders',
durable=True,
arguments={
'x-max-priority': 10, # 1-10 priorities, keep low!
'x-queue-type': 'classic' # Not supported on quorum
}
)
# Publishing with priority
channel.basic_publish(
exchange='',
routing_key='priority.orders',
body=payload,
properties=pika.BasicProperties(
delivery_mode=2,
priority=8 # Higher = more important
)
)
High Availability
Quorum Queues (Recommended for HA)
# Raft-based replication - ALWAYS use for critical queues
channel.queue_declare(
'orders.quorum',
durable=True,
arguments={
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3, # Replicas
'x-delivery-limit': 5, # Auto-DLQ after 5 redeliveries
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-strategy': 'at-least-once' # Safe DLQ
}
)
Stream Queues (High-throughput, replay)
# Kafka-like streams in RabbitMQ 3.9+
channel.queue_declare(
'events.stream',
durable=True,
arguments={
'x-queue-type': 'stream',
'x-max-length-bytes': 20_000_000_000, # 20GB retention
'x-max-age': '7D', # 7 days retention
'x-stream-max-segment-size-bytes': 500_000_000
}
)
# Consuming from offset
channel.basic_qos(prefetch_count=100)
channel.basic_consume(
'events.stream',
callback,
arguments={
'x-stream-offset': 'first' # first|last|next|timestamp|offset
}
)
Performance Tuning
Prefetch Optimization Formula
optimal_prefetch = (avg_processing_time_ms / avg_network_rtt_ms) * consumer_count * 1.5
Examples:
- Same datacenter (1ms RTT), 50ms processing, 1 consumer: (50/1) * 1 * 1.5 = 75
- Cross-region (50ms RTT), 50ms processing, 1 consumer: (50/50) * 1 * 1.5 = 2
- Batch processing (500ms), local: (500/1) * 1 * 1.5 = 750
Batch Publishing (10x throughput)
# Single publish: ~2000 msg/s
# Batch publish: ~20000+ msg/s
def batch_publish(channel, messages, batch_size=100):
channel.confirm_delivery()
for i in range(0, len(messages), batch_size):
batch = messages[i:i+batch_size]
for msg in batch:
channel.basic_publish(
exchange='batch.exchange',
routing_key=msg['key'],
body=msg['body'],
properties=pika.BasicProperties(delivery_mode=2)
)
# Confirm entire batch
channel.wait_for_confirms(timeout=30)
Memory Management
%% rabbitmq.conf - Production settings
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
disk_free_limit.absolute = 5GB
%% Queue memory limits
queue_index_embed_msgs_below = 4096
lazy_queue_explicit_gc_run_operation_threshold = 1000
%% Flow control tuning
credit_flow_default_credit = {400, 200}
Monitoring & Alerting
Critical Metrics
# Prometheus alerts - see references/monitoring.md for full config
- alert: RabbitMQHighMemory
expr: rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes > 0.8
- alert: RabbitMQQueueBacklog
expr: rabbitmq_queue_messages_ready > 100000
- alert: RabbitMQConsumerUtilization
expr: rabbitmq_queue_consumer_utilisation < 0.5 # Consumers idle = problem
- alert: RabbitMQUnackedMessages
expr: rabbitmq_queue_messages_unacknowledged > 10000
- alert: RabbitMQDiskAlarm
expr: rabbitmq_alarms_free_disk_space_watermark == 1
Health Check Script
# See scripts/health_check.sh for complete implementation
rabbitmqctl node_health_check
rabbitmqctl cluster_status
rabbitmq-diagnostics check_port_connectivity
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_local_alarms
Anti-Patterns to Avoid
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Connection per message | 1000x overhead | Connection pool |
| No prefetch (unlimited) | Memory explosion | Tune prefetch_count |
auto_ack=True |
Message loss | Manual ack after processing |
| Classic queues for HA | Split-brain risk | Use Quorum queues |
| Polling with basic_get | CPU waste, latency | Use basic_consume |
| Giant messages (>128KB) | Memory pressure | External storage + reference |
| No message TTL | Queue bloat | Set x-message-ttl |
| Unbounded queue growth | Disk/memory full | x-max-length + overflow policy |
File Reference
scripts/connection_pool.py- Production-grade connection poolingscripts/async_publisher.py- High-throughput async publisher with confirmsscripts/consumer_template.py- Robust consumer with retry logicscripts/health_check.sh- Comprehensive health check scriptscripts/queue_migrate.py- Zero-downtime queue migration toolscripts/dlq_processor.py- Dead letter queue reprocessingreferences/patterns.md- Advanced messaging patterns deep-divereferences/clustering.md- HA clustering configurationreferences/security.md- Security hardening guidereferences/monitoring.md- Full monitoring setupreferences/troubleshooting.md- Problem diagnosis guidereferences/performance.md- Performance tuning deep-diveassets/rabbitmq.conf- Production configuration templateassets/docker-compose.yml- Development cluster setupassets/k8s/- Kubernetes deployment manifests
You Might Also Like
Related Skills

create-pr
Creates GitHub pull requests with properly formatted titles that pass the check-pr-title CI validation. Use when creating PRs, submitting changes for review, or when the user says /pr or asks to create a pull request.
n8n-io
electron-chromium-upgrade
Guide for performing Chromium version upgrades in the Electron project. Use when working on the roller/chromium/main branch to fix patch conflicts during `e sync --3`. Covers the patch application workflow, conflict resolution, analyzing upstream Chromium changes, and proper commit formatting for patch fixes.
electron
pr-creator
Use this skill when asked to create a pull request (PR). It ensures all PRs follow the repository's established templates and standards.
google-gemini
clawdhub
Use the ClawdHub CLI to search, install, update, and publish agent skills from clawdhub.com. Use when you need to fetch new skills on the fly, sync installed skills to latest or a specific version, or publish new/updated skill folders with the npm-installed clawdhub CLI.
moltbot
tmux
Remote-control tmux sessions for interactive CLIs by sending keystrokes and scraping pane output.
moltbot
create-pull-request
Create a GitHub pull request following project conventions. Use when the user asks to create a PR, submit changes for review, or open a pull request. Handles commit analysis, branch management, and PR creation using the gh CLI tool.
cline