配置死信交换器和重试队列

使用死信交换器(DLX)在收到否定确认、过期或队列溢出事件后,将消息移出主处理路径。当工作负载需要延迟第二次或后续投递尝试时,将 DLX 与重试队列结合使用。

本指南将两个关注点分开说明:

  • 死信队列(DLQ):用于处理应停止正常处理的消息。
  • 重试队列:用于在延迟后返回工作队列的消息。

适用场景

当你需要以下一种或多种能力时,请使用此模式:

  • 将永久失败的消息放入 DLQ 以便检查。
  • 延迟重试,而不是立即重新入队失败消息。
  • 保持主队列不包含反复失败的消息。

不要依赖带有 requeue=true 的无限 basic.nackbasic.reject 循环。它们会形成紧密的重新投递循环,并使队列健康状况更难控制。

操作步骤

在此示例中:

  • 生产者将工作消息发布到 orders.work
  • 消费者从 orders.q 读取消息。
  • 永久失败的消息通过 orders.dlx 进入 orders.dlq
  • 短暂失败的消息由原生应用重新发布到 orders.retry
  • orders.retry.30s 将重试消息保留 30 秒,然后再将它们死信回 orders.work

1. 声明交换器

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare exchange name=orders.work type=direct durable=true

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare exchange name=orders.retry type=direct durable=true

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare exchange name=orders.dlx type=direct durable=true

2. 声明工作队列和 DLQ

工作队列会使用路由键 orders.failed 将失败消息死信到 orders.dlx

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare queue \
  name=orders.q \
  durable=true \
  arguments='{"x-dead-letter-exchange":"orders.dlx","x-dead-letter-routing-key":"orders.failed"}'

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare queue name=orders.dlq durable=true

绑定队列:

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare binding \
  source=orders.work \
  destination_type=queue \
  destination=orders.q \
  routing_key=orders

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare binding \
  source=orders.dlx \
  destination_type=queue \
  destination=orders.dlq \
  routing_key=orders.failed

当消费者以 requeue=false 拒绝来自 orders.q 的消息时,RabbitMQ 会将该消息死信到 orders.dlq

3. 声明重试队列

创建一个将重新投递延迟 30 秒的重试队列:

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare queue \
  name=orders.retry.30s \
  durable=true \
  arguments='{"x-message-ttl":30000,"x-dead-letter-exchange":"orders.work","x-dead-letter-routing-key":"orders"}'

绑定重试队列:

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  declare binding \
  source=orders.retry \
  destination_type=queue \
  destination=orders.retry.30s \
  routing_key=orders.30s

4. 实现原生应用的重试决策

使用原生应用逻辑判断失败是短暂的还是永久的:

  • 对于短暂失败,将消息以路由键 orders.30s 发布到 orders.retry
  • 对于永久失败,以 requeue=falseorders.q 拒绝该消息,或者显式将其发布到 DLQ 路径。

当消费者将短暂失败重新发布到 orders.retry 时,应仅在重试发布成功后再确认原始投递。在生产环境中,请对重试发布路径使用发布者确认,这样消费者不会在 RabbitMQ 接受重试副本之前删除原始消息。

如果重试发布失败,或者未收到确认,请不要对原始投递执行正向确认。请根据你的失败策略进行重新入队或重试。

即使使用发布者确认,在重连或部分失败期间仍然可能出现重复消息。消费者和下游处理程序应保持幂等。

RabbitMQ 不为 classic queue 提供通用的内置重试计数器。如果你需要最大重试次数,请使用原生应用逻辑、检查 x-death 头,或者在合适的情况下采用 quorum queue 投递限制。

5. 验证拓扑

验证交换器、队列和绑定:

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  list exchanges name type

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  list queues name arguments messages consumers

rabbitmqadmin \
  --host <management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  list bindings source_name destination_name routing_key

推荐实践

  • 使用专用 DLQ 进行检查和回放。
  • 使用延迟重试队列,而不是立即重新入队循环。
  • 保持消费者幂等,因为重试和故障切换可能会产生重复投递。
  • 由原生应用决定消息何时应停止重试。
  • 监控 DLQ 的增长。DLQ 持续增长通常表示代码、模式或依赖项存在问题。

相关信息