客户端连接恢复和故障转移

RabbitMQ 客户端恢复属于应用的责任。broker 可以接受重新连接,但应用仍然需要适当的端点选择、重试、确认以及幂等性行为。

使用本指南可增强生产者和消费者在节点重启、滚动升级、瞬时网络故障以及站点级端点变更场景下的抗故障能力。

设计原则

在生产环境客户端中使用以下设计原则:

  • 当客户端库支持多个 broker 地址时,配置多个地址。
  • 将生产者和消费者连接分离,避免一个流被阻塞时影响另一个流。
  • 使用 heartbeat 和合理的连接超时。
  • 当客户端库提供自动恢复功能时,启用它。
  • 对生产者使用发布确认。
  • 对消费者使用手动确认和幂等处理。
  • 即使启用了节点级自动恢复,也要将站点级故障转移视为应用配置变更。

Java 示例

RabbitMQ Java client 支持自动连接恢复和拓扑恢复:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("<username>");
factory.setPassword("<password>");
factory.setVirtualHost("/");
factory.setAutomaticRecoveryEnabled(true);
factory.setTopologyRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setRequestedHeartbeat(30);
factory.setConnectionTimeout(10000);

Address[] addresses = {
    new Address("<ip-1>", <port-1>),
    new Address("<ip-2>", <port-2>),
    new Address("<ip-3>", <port-3>)
};

Connection connection = factory.newConnection(addresses);
Channel channel = connection.createChannel();
channel.confirmSelect();

在生产者 channel 上使用发布确认,并在应用代码中检查确认失败。

Python 示例

使用 pika 时,显式实现重连循环,并在连接失败后重新创建 channel 或 consumer:

import pika
import random
import time

def process_message(body):
    print(f"Processing {body!r}")

def on_message(channel, method, properties, body):
    try:
        process_message(body)
        channel.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

credentials = pika.PlainCredentials("<username>", "<password>")
endpoints = [
    pika.ConnectionParameters("<ip-1>", <port-1>, "/", credentials, heartbeat=30),
    pika.ConnectionParameters("<ip-2>", <port-2>, "/", credentials, heartbeat=30),
    pika.ConnectionParameters("<ip-3>", <port-3>, "/", credentials, heartbeat=30),
]

while True:
    try:
        random.shuffle(endpoints)
        connection = pika.BlockingConnection(endpoints)
        channel = connection.channel()
        channel.basic_qos(prefetch_count=50)
        channel.basic_consume(
            queue="orders",
            on_message_callback=on_message,
            auto_ack=False,
        )
        channel.start_consuming()
    except pika.exceptions.AMQPConnectionError:
        time.sleep(5)
        continue

如果你的 consumer 依赖已声明的拓扑,请根据客户端库的行为,在重新连接后重新创建或验证拓扑。

端点故障转移策略

使用以下端点策略之一:

策略适用场景说明
客户端中配置多个 broker 地址应用可以接收多个地址并在这些地址之间轮换。适用于单个集群内的节点级故障转移。
Kubernetes Service 或 负载均衡器 地址客户端应使用一个稳定地址。简化配置,但要确保服务类型与流量来源匹配。
在主站点和 DR 端点之间切换配置你需要站点级故障转移。结合有文档记录的运维手册,以及应用重启或重新加载逻辑使用。

单个集群内的节点间自动恢复不会自动将应用切换到不同的 DR 站点。站点故障转移仍然需要配置、DNS 或服务发现变更。

验证清单

在更改客户端恢复设置后,验证以下内容:

  • 应用可以连接到多个 broker 地址。
  • 生产者使用确认,并且在发布未被接受时会明确失败。
  • 消费者使用手动确认,并且可以干净地重启。
  • 重连循环不会创建重复的 consumer 注册。
  • 对于客户端可使用的每个端点,TLS 设置都保持有效。

相关信息