<!-- etc/communication.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="vendor.tax.audit.generate"
request="Vendor\TaxCompliance\Api\Data\AuditPayloadInterface"/>
</config>
### Step 3: Wire the Publisher and Topology
Separate publisher configuration from exchange routing. This abstraction allows swapping message brokers without modifying business logic.
```xml
<!-- etc/queue_publisher.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="vendor.tax.audit.generate">
<connection name="amqp" exchange="vendor.tax.exchange" disabled="false"/>
<connection name="db" exchange="vendor.tax.db.exchange" disabled="true"/>
</publisher>
</config>
<!-- etc/queue_topology.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="vendor.tax.exchange" type="topic" connection="amqp">
<binding id="auditRouting"
topic="vendor.tax.audit.generate"
destinationType="queue"
destination="vendor.tax.audit.queue"/>
</exchange>
</config>
Step 4: Register the Consumer Handler
Define how the queue invokes processing logic. The maxMessages parameter is mandatory for production stability. PHP's object manager accumulates metadata and cached instances over time; recycling the consumer process after a fixed count guarantees predictable memory usage.
<!-- etc/queue_consumer.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="taxAuditProcessor"
queue="vendor.tax.audit.queue"
connection="amqp"
consumerInstance="Magento\Framework\MessageQueue\Consumer"
handler="Vendor\TaxCompliance\Model\AuditExecutor::process"
maxMessages="5000"/>
</config>
Step 5: Implement the Consumer Handler
Handlers must be stateless, idempotent, and resilient to transient network failures. Distinguish between retryable errors and fatal exceptions to prevent queue poisoning.
namespace Vendor\TaxCompliance\Model;
use Vendor\TaxCompliance\Api\Data\AuditPayloadInterface;
use Vendor\TaxCompliance\Api\ComplianceGatewayInterface;
use Psr\Log\LoggerInterface;
use Magento\Framework\Exception\LocalizedException;
class AuditExecutor
{
public function __construct(
private readonly ComplianceGatewayInterface $gateway,
private readonly LoggerInterface $logger
) {}
public function process(AuditPayloadInterface $payload): void
{
try {
$this->gateway->submitAuditRecord(
$payload->getInvoiceId(),
$payload->getStoreViewCode(),
$payload->getTransactionHash()
);
} catch (LocalizedException $e) {
$this->logger->critical(
'Compliance submission failed for invoice {invoiceId}',
['invoiceId' => $payload->getInvoiceId(), 'error' => $e->getMessage()]
);
throw $e; // Triggers Magento's retry/DLX mechanism
} catch (\Throwable $e) {
$this->logger->error(
'Non-retryable compliance error for invoice {invoiceId}',
['invoiceId' => $payload->getInvoiceId(), 'trace' => $e->getTraceAsString()]
);
// Swallow non-critical errors to prevent queue poisoning
}
}
}
Step 6: Trigger Publication via Plugin
Intercept the business event and publish asynchronously. Never block the request lifecycle.
namespace Vendor\TaxCompliance\Plugin;
use Magento\Sales\Api\Data\InvoiceInterface;
use Magento\Framework\MessageQueue\PublisherInterface;
use Vendor\TaxCompliance\Api\Data\AuditPayloadInterfaceFactory;
class InvoiceAuditTrigger
{
public function __construct(
private readonly PublisherInterface $publisher,
private readonly AuditPayloadInterfaceFactory $payloadFactory
) {}
public function afterRegister(
\Magento\Sales\Api\InvoiceManagementInterface $subject,
InvoiceInterface $result
): InvoiceInterface {
$payload = $this->payloadFactory->create();
$payload->setInvoiceId((int)$result->getEntityId());
$payload->setStoreViewCode($result->getStore()->getCode());
$payload->setTransactionHash(bin2hex(random_bytes(16)));
$payload->setAuditTimestamp(date('c'));
$this->publisher->publish('vendor.tax.audit.generate', $payload);
return $result;
}
}
Architecture Rationale:
- Publisher/Handler Separation: Isolates the web request lifecycle from external API latency and network instability.
- Explicit Schema Registration: Prevents silent serialization failures and enables framework-level validation before messages enter the broker.
maxMessages Enforcement: Guarantees predictable memory usage by forcing periodic process recycling. PHP's garbage collector does not reclaim framework metadata efficiently in long-running processes.
- Exception Routing: Throwing
LocalizedException signals the framework to retry or route to a dead-letter exchange. Catching \Throwable prevents unhandled fatal errors from poisoning the queue.
Pitfall Guide
1. Unbounded Memory Growth
Explanation: Long-running PHP consumers accumulate cached objects, database connections, and framework metadata. Without process recycling, memory consumption climbs until the OS OOM-killer terminates the process.
Fix: Always configure maxMessages (1000β10000 depending on payload complexity). Pair with Supervisor or systemd to auto-restart recycled processes. Explicitly call gc_collect_cycles() in handlers if processing large datasets.
2. Non-Idempotent Handler Logic
Explanation: Message queues guarantee at-least-once delivery. If a handler performs a non-idempotent action (e.g., charging a payment, incrementing a counter) and the consumer crashes after execution but before acknowledgment, the message is redelivered, causing duplicate side effects.
Fix: Implement idempotency keys or database constraints. Check for existing records before processing. Design handlers to be safely re-executable without altering business state.
3. Topology Binding Mismatches
Explanation: Declaring a topic in communication.xml but failing to bind it correctly in queue_topology.xml results in messages being published to an exchange with no routing path. Messages disappear silently, causing data loss.
Fix: Verify bindings match topic names exactly. Use bin/magento queue:consumers:list and the RabbitMQ management UI to confirm queue-exchange connections before deployment. Implement integration tests that publish and consume a test message.
4. Ignoring Dead Letter Exchanges (DLX)
Explanation: Failed messages that exceed retry limits are dropped by default. This causes silent data loss in critical workflows like inventory updates, payment webhooks, or compliance logging.
Fix: Configure a DLX in queue_topology.xml to route exhausted messages to a separate queue. Set up automated alerting or a manual inspection dashboard for DLQ contents. Never treat DLQ as a black hole.
5. Consumer Starvation vs. Overload
Explanation: Running too many parallel consumers against a single queue or database table creates lock contention, connection pool exhaustion, and CPU thrashing. Conversely, too few consumers cause message backlog and SLA violations.
Fix: Start with 1β2 consumers per queue. Scale horizontally only after monitoring queue depth, processing rate, and database lock waits. Use RabbitMQ's prefetch_count to throttle delivery and prevent consumer overload.
6. Database-Backed Queues in Production
Explanation: MySQL queues rely on polling and row-level locking. Under sustained load, table scans increase, connection pools saturate, and throughput caps at ~200 msgs/sec. This creates a bottleneck that defeats the purpose of async processing.
Fix: Reserve MySQL queues for development or low-volume internal tasks. Route production workloads to RabbitMQ or a managed message broker. Configure connection pooling and keep-alive settings to prevent TCP exhaustion.
7. Missing Process Supervision
Explanation: Magento's queue consumers are CLI processes. If they crash due to an unhandled exception or network partition, they remain dead until manually restarted, causing silent pipeline failures.
Fix: Deploy consumers under Supervisor or systemd. Configure auto-restart policies, log rotation, and health checks. Monitor process uptime and queue depth via Prometheus/Grafana or Datadog.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Development/Testing | MySQL Queue Backend | Zero infrastructure overhead, easy local setup | Low (infrastructure) / High (scaling limits) |
| Low-Volume Internal Tasks | MySQL Queue Backend | Simplifies deployment, acceptable throughput <50 msgs/sec | Low |
| Customer-Facing Async Workflows | RabbitMQ Backend | High throughput, prefetch control, DLX routing, horizontal scaling | Medium (broker hosting) / Low (operational risk) |
| Multi-Region/Global Deployments | Managed RabbitMQ or AWS SQS | Cross-region replication, managed scaling, reduced ops overhead | High (managed service) / Low (maintenance) |
| Strict Compliance/Audit Trails | RabbitMQ + DLX + Persistent Storage | Guaranteed delivery, auditability, dead-letter inspection | Medium |
Configuration Template
Supervisor Configuration for Process Management
[program:magento-tax-audit]
command=/usr/bin/php /var/www/html/bin/magento queue:consumers:start taxAuditProcessor --max-messages=5000
directory=/var/www/html
user=www-data
autostart=true
autorestart=true
startretries=3
stderr_logfile=/var/log/magento/tax-audit.err.log
stdout_logfile=/var/log/magento/tax-audit.out.log
numprocs=2
process_name=%(program_name)s_%(process_num)02d
Magento Queue Topology with DLX
<!-- etc/queue_topology.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="vendor.tax.exchange" type="topic" connection="amqp">
<binding id="auditRouting"
topic="vendor.tax.audit.generate"
destinationType="queue"
destination="vendor.tax.audit.queue"/>
</exchange>
<exchange name="vendor.tax.dlx" type="direct" connection="amqp">
<binding id="dlxRouting"
destinationType="queue"
destination="vendor.tax.audit.dlq"/>
</exchange>
</config>
Quick Start Guide
- Define the Payload Interface: Create a PHP interface extending Magento's data contract standards. Ensure all getters return scalar or serializable types.
- Register Schema & Topology: Add
communication.xml, queue_publisher.xml, and queue_topology.xml to your module's etc/ directory. Verify exchange bindings match topic names exactly.
- Deploy the Consumer: Run
bin/magento setup:upgrade to register the consumer. Start it manually with bin/magento queue:consumers:start [consumer_name] --max-messages=1000 to validate routing.
- Implement Handler Logic: Write a stateless handler class. Inject dependencies via constructor. Implement explicit exception routing: throw
LocalizedException for retries, catch \Throwable for fatal errors.
- Supervise & Monitor: Configure Supervisor or systemd to manage the consumer process. Set up queue depth monitoring and DLQ alerting. Scale consumers horizontally only after validating throughput metrics.