ge: rabbitmq:4.0.5-management
ports: ["5672:5672", "15672:15672"]
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD:-secret}
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbit log_levels [{connection, warning}]"
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "check_port_connectivity"]
interval: 10s
timeout: 5s
retries: 5
deploy:
resources:
limits: { memory: 2G }
kafka:
image: confluentinc/cp-kafka:7.8.0
ports: ["9092:9092"]
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 15s
timeout: 10s
retries: 5
deploy:
resources:
limits: { memory: 4G }
zookeeper:
image: confluentinc/cp-zookeeper:7.8.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
deploy:
resources:
limits: { memory: 1G }
### 2. RabbitMQ: Work Dispatch Pattern (Python 3.12 + pika 1.3.2)
```python
# rabbitmq_worker.py
import pika
import json
import logging
import signal
import sys
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class RabbitMQWorker:
def __init__(self, url: str, queue: str):
self.url = url
self.queue = queue
self.connection = None
self.channel = None
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def connect(self):
params = pika.URLParameters(self.url)
params.heartbeat = 600
params.blocked_connection_timeout = 300
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, durable=True)
self.channel.basic_qos(prefetch_count=1)
logger.info("Connected to RabbitMQ. Waiting for messages...")
def callback(self, ch, method, properties, body):
try:
task = json.loads(body)
logger.info(f"Processing task: {task}")
# Simulate work
# time.sleep(0.1)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Processing failed: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def consume(self):
self.channel.basic_consume(queue=self.queue, on_message_callback=self.callback)
self.channel.start_consuming()
def _shutdown(self, signum, frame):
logger.info("Shutting down gracefully...")
if self.channel and self.channel.is_open:
self.channel.stop_consuming()
if self.connection and self.connection.is_open:
self.connection.close()
sys.exit(0)
if __name__ == "__main__":
worker = RabbitMQWorker(url="amqp://admin:secret@localhost:5672/", queue="task_queue")
worker.connect()
worker.consume()
3. Kafka: Event Streaming Pattern (Python 3.12 + confluent-kafka 2.4.0)
# kafka_stream.py
from confluent_kafka import Consumer, Producer, KafkaError
import json
import logging
import signal
import sys
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class KafkaStream:
def __init__(self, broker: str, group: str, topic: str):
self.broker = broker
self.group = group
self.topic = topic
self.consumer = None
self.producer = None
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def init_producer(self):
conf = {
"bootstrap.servers": self.broker,
"enable.idempotence": True,
"acks": "all",
"transactional.id": f"producer-{self.group}",
"linger.ms": 5,
"batch.size": 65536
}
self.producer = Producer(conf)
self.producer.init_transactions()
self.producer.begin_transaction()
logger.info("Kafka idempotent producer initialized.")
def init_consumer(self):
conf = {
"bootstrap.servers": self.broker,
"group.id": self.group,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"isolation.level": "read_committed",
"max.poll.interval.ms": 300000
}
self.consumer = Consumer(conf)
self.consumer.subscribe([self.topic])
logger.info("Kafka consumer initialized. Waiting for events...")
def produce(self, key: str, value: dict):
self.producer.produce(self.topic, key=key, value=json.dumps(value).encode("utf-8"))
self.producer.poll(0)
def consume_loop(self):
try:
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Consumer error: {msg.error()}")
break
data = json.loads(msg.value().decode("utf-8"))
logger.info(f"Received: key={msg.key()}, offset={msg.offset()}, data={data}")
# Process data...
self.consumer.commit(msg, asynchronous=False)
except KeyboardInterrupt:
pass
finally:
self.consumer.close()
def _shutdown(self, signum, frame):
logger.info("Shutting down Kafka stream...")
if self.producer:
try:
self.producer.commit_transaction()
except Exception:
self.producer.abort_transaction()
self.producer.flush()
if self.consumer:
self.consumer.close()
sys.exit(0)
if __name__ == "__main__":
stream = KafkaStream(broker="localhost:9092", group="analytics-group", topic="events")
stream.init_producer()
stream.init_consumer()
stream.consume_loop()
4. Execution Steps
- Save
docker-compose.yml and run docker compose up -d.
- Verify health:
docker compose ps (all services should show healthy).
- Install dependencies:
pip install pika==1.3.2 confluent-kafka==2.4.0.
- Run RabbitMQ worker:
python rabbitmq_worker.py.
- Run Kafka stream:
python kafka_stream.py.
- Publish test messages via management UI (RabbitMQ) or
kafka-console-producer.sh (Kafka).
Pitfall Guide
| Symptom | Root Cause | Fix |
|---|
CONNECTION_FORCED - broker forced connection closure with reason 'shutdown' | RabbitMQ memory alarm triggered (default 40% of RAM) | Increase vm_memory_high_watermark or switch to lazy queue mode. Monitor rabbitmq_memory metrics. |
KafkaError{code=_GROUP_COORDINATOR_NOT_AVAILABLE} | Broker not ready or KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR mismatch | Ensure replication factor matches cluster size. Use healthcheck before connecting. |
| Consumer rebalancing storms | max.poll.interval.ms too low for processing time | Increase to 300000+. Tune fetch.min.bytes and max.partition.fetch.bytes to reduce poll frequency. |
| Duplicate messages on restart | Auto-commit enabled + crash before processing | Disable enable.auto.commit. Commit offsets synchronously after successful processing. |
Publisher confirms not enabled | Missing confirm_delivery in pika or missing enable.idempotence in Kafka | RabbitMQ: channel.confirm_delivery(). Kafka: enable.idempotence: True + acks: all. |
| Disk exhaustion (Kafka) | Retention policy misconfigured or compaction disabled | Set log.retention.hours or log.retention.bytes. Enable cleanup.policy=compact for key-value topics. |
Diagnostic Commands:
- RabbitMQ:
rabbitmq-diagnostics check_port_connectivity, rabbitmqctl list_queues name messages memory
- Kafka:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group>, kafka-topics.sh --describe --topic <topic>
Production Bundle
Deployment Checklist
Monitoring & Alerting
- RabbitMQ:
rabbitmq_queue_messages_ready, rabbitmq_mem_alarm, rabbitmq_disk_free_alarm, rabbitmq_connection_count
- Kafka:
kafka_server_BrokerTopicMetrics_MessagesInPerSec, kafka_consumer_fetch_manager_records_lag, kafka_controller_ActiveControllerCount, kafka_log_LogFlushRateAndTimeMs
- Alert Thresholds: Queue depth > 100k (RabbitMQ), Consumer lag > 5000 offsets for 5m (Kafka), Memory usage > 75%, Disk usage > 80%.
Scaling Strategy
- RabbitMQ: Scale vertically until memory/disk alarms. Use Shovel/Federation for cross-region, or queue sharding via consistent hashing. Avoid single-queue bottlenecks by partitioning work across multiple queues.
- Kafka: Scale horizontally by increasing partitions. Rebalance partitions during low-traffic windows. Use tiered storage for cost-effective long-term retention. Monitor partition skew to prevent hot brokers.
Cost Optimization
- Disable exactly-once semantics unless audited. Application-level idempotency (e.g., database unique constraints, Redis dedup keys) is 40β60% cheaper than broker-enforced transactions.
- Use lazy queues in RabbitMQ for high-volume, low-priority tasks to force disk paging earlier.
- Enable Kafka log compaction for stateful topics to cap storage growth without sacrificing replayability.