使用 Federated Exchanges 实现 RabbitMQ 灾备
本文介绍了一种基于 federated exchanges 的 RabbitMQ 灾备(DR)模式。在该模式中,主 RabbitMQ 实例或集群会将业务消息发布到 upstream exchange,而备用 DR RabbitMQ 实例或集群则使用一个从 upstream exchange federate 而来的 downstream exchange。
Federated exchanges 可为选定的消息流提供跨集群、异步的消息复制。它们适用于 warm-standby 设计、区域分发以及选择性复制。它们不是同步高可用机制,也不能替代应用故障切换、定期备份、RabbitMQ definitions 管理或持久队列设计。
目录
适用场景DR 模式的工作方式前提条件启用 Federation 插件为 DR 就绪准备 RabbitMQ Definitions操作步骤1. 准备 primary exchange2. 准备 DR exchange、queue 和 binding3. 定义 DR backlog 保留窗口4. 在 DR 集群上配置 federation upstream5. 为 downstream exchange 应用 federation policy6. 向 primary exchange 发布测试消息7. 验证 DR 侧接收到复制消息8. 准备应用故障切换流程验证限制与设计说明故障排查插件未启用DR definitions 缺失federation parameter 或 policy 缺失消息未出现在 DR 侧认证、网络或 TLS 问题中断了链接在拓扑变更或重连后出现重复消息适用场景
在需要以下一种或多种能力时,使用 federated exchanges:
- 让远端 RabbitMQ 集群保持 warm 状态,并保留所选消息流的相对最新副本。
- 只复制 exchange 的子集,而不是整个集群状态。
- 容忍临时的 WAN 或跨集群连接故障,并允许链接自动重连。
- 准备一个备用站点,在应用将 producer 和 consumer 切换到 DR 环境后能够接收消息。
在需要以下一种或多种能力时,federated exchanges 通常不是正确的解决方案:
- 严格的
RPO=0保证。 - 跨集群的同步零丢失复制。
- 应用侧 producer 或 consumer 的自动故障切换。
- RabbitMQ users、permissions、exchanges、queues、bindings、policies、TLS 材料、Kubernetes 资源或应用配置的自动同步。
- 作为备份、恢复或持久化存储规划的替代方案。
DR 模式的工作方式
在此示例中:
rabbitmq-primary承载 upstream exchangeapp.events。rabbitmq-dr承载 downstream exchangeapp-events-dr。- 在
rabbitmq-dr上配置了 federation link。 rabbitmq-dr上的 policy 将app-events-dr选为 federated exchange。- downstream 侧的 bindings 决定从 upstream 侧请求哪些消息。
从概念上讲,在主站点向 app.events 发布的消息,会被复制到 DR 站点的 app-events-dr,就像这些消息是直接本地发布到 downstream exchange 一样。
请使用与应用故障切换计划匹配的 exchange、queue 和 binding 名称。此示例使用不同的主站点和 DR exchange 名称,以明确方向。如果应用在故障切换后必须使用相同的 exchange 或 queue 名称,请在 DR 集群中声明这些名称,并一致地更新相关命令。
Federation links 是异步的。在网络分区、认证失败或 upstream 故障期间,消息可能会落后,或者在链接重连之前无法在 DR 侧获取。不要将该模式描述为同步 HA 或保证零丢失复制。
前提条件
在配置 federated exchanges 之前,请确保满足以下条件:
- 您有两个可达的 RabbitMQ 实例或集群,本文中分别命名为
rabbitmq-primary和rabbitmq-dr。 - downstream DR 集群已启用
rabbitmq_federation插件。 - 仅当您希望在管理 UI 或 API 中看到 federation 页面时,才在 downstream DR 集群上启用
rabbitmq_federation_management。 - upstream primary 集群在 federated exchanges 场景下不需要
rabbitmq_federation或rabbitmq_federation_management。 - 您可以从 DR 侧通过
<primary-host>:<primary-port>访问 upstream AMQP listener。 - 您可以访问两个环境的 management UI 或 CLI。
- federation URI 中的 upstream user 具有连接所需 virtual host 并访问 upstream exchange 的权限。
- 您了解与环境对应的访问地址、凭证和 namespace 值。
- 已为应用故障切换规划好 DR 环境所需的 RabbitMQ definitions。
您还应考虑以下设计要求:
- 对重要消息流使用 durable exchanges 和 durable queues。
- 当工作负载要求 broker 重启后可恢复时,发布 persistent 消息。
- 使 consumer 具备幂等性,因为在重连、拓扑变更或多路径路由期间仍可能发生重复。
- 除非已明确设计了循环防护和重复处理,否则避免双向或 mesh 风格拓扑。
启用 Federation 插件
此模式所需的插件位于 downstream DR 集群上。在 rabbitmq-dr 上启用 rabbitmq_federation。仅当您希望在管理 UI 或 API 中看到 federation 页面时,才在 rabbitmq-dr 上启用 rabbitmq_federation_management。upstream primary 集群不需要 federation 插件。
以下示例保持 primary 集群不配置 federation 插件,并通过 spec.rabbitmq.additionalPlugins 在 DR 集群上启用这些插件。
operator 滚动更新后的 StatefulSet 完成后,请确认 DR 侧已启用插件:
输出中应包含 rabbitmq_federation。如果启用了 management 插件,输出中还应包含 rabbitmq_federation_management。
为 DR 就绪准备 RabbitMQ Definitions
Federation 在 exchanges 之间移动选定消息。它不会同步应用拓扑、安全定义或平台资源。在执行 DR 切换之前,DR 集群中必须已经包含工作负载所需的 virtual hosts、exchanges、queues、bindings、users、permissions、policies、parameters、TLS 材料、Kubernetes Secret 对象以及应用配置。
请使用以下方法之一:
- 如果 RabbitMQ definitions 由 GitOps、应用启动引导代码或其他声明式流程管理,请将相同的预期 definitions 应用于 DR 集群,并在那里验证。
- 如果 primary 拓扑已存在且未以声明式方式管理,请从 primary 集群导出 definitions,并将审查后的 definitions 导入 DR 集群。
definitions 的导出和导入是一个时间点操作。其范围取决于您执行的是集群级导出还是单个 vhost 导出:
- 当 DR 集群必须预先填充 virtual hosts、users、permissions、exchanges、queues、bindings、runtime parameters 和 policies 时,请使用集群级导出。
- 仅当目标 virtual host、users 和 permissions 已在 DR 集群上准备就绪,而您只需要迁移该 virtual host 的拓扑时,才使用单个 vhost 导出。在 RabbitMQ 3.8.16 中,
rabbitmqadmin --vhost / export ...文件包含 vhost 作用域的拓扑键,例如exchanges、queues、bindings、parameters和policies,但不包含users、permissions或vhosts。
definitions 的导出和导入不会复制 queue 内容、durable message stores、stream 数据、存储在 RabbitMQ 之外的 Kubernetes 资源、TLS 密钥材料或应用配置。
为确保 DR 就绪,除非您明确只需要单个 vhost 的拓扑文件,否则请从 primary 集群导出集群级 definitions:
在导入 primary-definitions.json 之前,请先审查它。移除或调整仅适用于 primary 站点的 definitions,例如 upstream URI、shovel 参数、不应应用于 DR 的 policies、测试用户,或在站点之间故意不同的拓扑。请将该文件视为敏感文件,因为它可能包含用户密码哈希和运维配置。
将审查后的集群级 definitions 导入 DR 集群:
如果您只需要迁移 DR 集群中已存在的某个 virtual host 的拓扑,请在导出和导入命令中都加上 --vhost <vhost>:
验证应用在故障切换后所需的 definitions:
如果您导入的是集群级 definitions,还应验证所需的 virtual hosts、users 和 permissions 是否存在:
如果您导入 primary definitions 后仍然使用 DR 专用的 downstream exchange 或 queue,请在导入后创建这些 DR 专用对象。下面的操作步骤会显式声明示例对象。如果您的 DR definitions 中已经包含等价对象,请先验证后跳过重复声明命令。
操作步骤
1. 准备 primary exchange
在 rabbitmq-primary 上声明 upstream exchange。下面的示例使用名为 app.events 的 durable topic exchange。
在下面的命令中,rabbitmqadmin 连接到 RabbitMQ management endpoint,例如端口 15672。后面配置的 federation upstream URI 必须使用 AMQP listener,例如 amqp:// 对应 5672,amqps:// 对应 5671。
如果您的应用已经向现有 exchange 发布消息,请复用该 exchange 名称,而不是创建新的 exchange。
2. 准备 DR exchange、queue 和 binding
在 rabbitmq-dr 上声明一个 downstream exchange、一个 DR queue 和一个 binding。队列只需声明一次,然后通过 queue policy 做消息保留,而不要使用不同参数重新声明队列。
downstream binding 控制从 upstream 侧请求哪些 routing key。binding 更改会异步传播,因此在期望新的过滤行为生效之前,请留出一点时间。
3. 定义 DR backlog 保留窗口
federated exchanges 最好被视为一种有边界的 warm-standby 模式。DR 侧应保留消息的时间窗口是经过设计的,而不是在 standby consumer 停止时无限积累。
请在 DR queue 上使用 message TTL 来定义消息可以在 standby backlog 中保留多长时间。不要在这个 durable standby queue 上设置 expires 或 x-expires。Queue expiration 会在一段不活动时间后删除未使用的 queue。在 warm-standby 设计中,DR queue 可能会有意长时间没有活动 consumer,因此 queue expiration 可能会删除该 queue 以及已积累的 DR backlog。
以下示例通过应用 queue policy,使 app-events-dr-q 中的消息最多保留 24 小时:
如果已有其他 queue policy 作用于 DR queue,请在该 policy 中添加 message-ttl,而不是创建冲突的 policy。一个 queue 可能会受 policy 优先级影响,因此请在变更后验证当前生效的 policy。
验证 policy 和 queue:
此示例将 standby backlog 限定为 24 小时的消息保留窗口。更短的值会减少磁盘增长,但会缩小恢复窗口。更长的值会在故障切换期间提供更多可用的 standby 历史,但也会增加 DR 侧的磁盘使用和 backlog 风险。
请根据以下因素选择保留窗口:
- 该工作负载可接受的恢复点。
- primary 站点故障期间预期的峰值消息量。
- DR 集群可用的存储空间。
4. 在 DR 集群上配置 federation upstream
在 rabbitmq-dr 上运行以下命令,创建名为 primary-app-events 的 federation upstream:
此配置的含义如下:
uri:upstream RabbitMQ 集群的 AMQP 连接地址。末尾的%2f是默认/virtual host 的 URL 编码形式。如果 upstream exchange 位于非默认 virtual host 中,请将%2f替换为该 upstream virtual host 的 URL 编码名称。exchange:要从中消费的 upstream exchange。max-hops:限制消息最多可经过多少条 federation link,并有助于避免循环。reconnect-delay:控制链接断开后,在重新连接前等待多长时间。
如果您的环境需要 TLS,请使用 amqps:// URI,并使用 RabbitMQ AMQP URI 支持的相关 TLS 连接参数。
5. 为 downstream exchange 应用 federation policy
在 rabbitmq-dr 上应用 policy,使 downstream exchange app-events-dr 使用 upstream 定义。
验证 upstream parameter 和 policy 是否存在:
6. 向 primary exchange 发布测试消息
向 rabbitmq-primary 上的 app.events 发布一个或多个持久化测试消息。
7. 验证 DR 侧接收到复制消息
在不删除 backlog 的情况下,检查 rabbitmq-dr 上绑定到 downstream exchange 的 queue:
如果 federation 正常工作,DR queue 会接收到发布到 upstream exchange 的消息,前提是这些消息的 routing key 与 downstream binding 匹配。ack_requeue_true 会将检查过的消息重新入队,因此在验证期间不会消耗 standby backlog。如果您使用的是一次性测试消息或一次性测试 queue 进行验证,在确认符合 DR 计划后,可以使用破坏性的确认模式。
8. 准备应用故障切换流程
Federation 只会移动选定的消息流,不会自动切换应用。对于实际的 DR 切换,请定义应用侧步骤,以便:
- 将 producer 重定向到
rabbitmq-dr。 - 将 consumer 重定向到
rabbitmq-dr。 - 确认 DR 侧已存在所需的 exchanges、queues、bindings、users、policies、TLS 材料、Kubernetes resources 和应用 secrets。
- 确认 DR queue backlog 处于恢复计划所期望的 message TTL 窗口内。
- 决定当 primary 站点恢复可用时,如何恢复或协调流量。
验证
配置完成后,使用以下检查项:
如果您还在 DR 集群上启用了 rabbitmq_federation_management,则可以通过 management UI 中与 federation 相关的页面查看 federation 配置和运行时状态。
限制与设计说明
使用 federation 进行 DR 时,请牢记以下限制:
- 复制是异步的,因此在正常运行期间会出现 lag,并且在网络问题期间可能加剧。
- federated exchange 不能替代 mirrored 或 quorum queue 设计、持久化存储或备份。
- 不保证
RPO=0。当链接不可用时,消息可能会延迟或在 DR 侧缺失。 - Federation 不会同步 RabbitMQ definitions。请通过导出和导入、GitOps、应用启动引导代码或其他受控流程单独管理 definitions。
- definitions 的导出和导入是快照操作。当 primary 拓扑、users、permissions、parameters 或 policies 发生变化时,请重新执行该操作或更新 DR definitions。
- downstream bindings 会影响从 upstream 复制的内容。binding 更新是最终一致的,而不是即时生效的。
- 直接发送到 downstream exchange 的发布,不会反向体现到只在 upstream 侧绑定的 queue。
- 默认 exchange 和 internal exchanges 不能被 federate。
max-hops有助于避免循环,但在复杂拓扑中无法消除所有重复场景。- durable exchanges 和 queues 可以降低恢复风险,但不会把 federation 从异步复制变成同步复制。
- Message TTL 定义了实际可行的 warm-standby backlog 窗口。更大的值会增加可用回放窗口,但也会增加 DR 侧的磁盘消耗。
- 对于必须在长时间无 consumer 的情况下仍然存活的 durable standby queue,不应使用 queue expiration。
- 认证、授权、DNS、端口、防火墙规则以及 TLS certificate 信任都必须正确,链接才能保持健康。
故障排查
插件未启用
现象:
在 rabbitmq-dr 上执行 rabbitmq-plugins list -e 时,没有显示 rabbitmq_federation,或者在您期望支持 management UI 或 API 时,没有显示 rabbitmq_federation_management。
检查项:
- 确认
rabbitmq-dr上的spec.rabbitmq.additionalPlugins包含rabbitmq_federation。 - 如果您需要 federation management UI 或 API 页面,请确认
rabbitmq-dr上的spec.rabbitmq.additionalPlugins也包含rabbitmq_federation_management。 - 检查 DR RabbitMQ pods 是否在 spec 变更后已经重启。
建议:
更新 rabbitmq-dr 的 RabbitmqCluster 资源,等待 rollout 完成,然后再次在 DR 集群上验证插件。
DR definitions 缺失
现象:
Federation 已配置,但在故障切换后,producer 或 consumer 失败,因为 rabbitmq-dr 上缺少 exchanges、queues、bindings、users、permissions 或 TLS 材料。
检查项:
- 确认 definitions 是通过 GitOps、应用启动引导代码,还是 RabbitMQ definitions 的导出和导入进行管理的。
- 在
rabbitmq-dr上列出应用所需的 exchanges、queues、bindings、users、permissions、parameters 和 policies。 - 确认 Kubernetes
Secret对象、证书以及应用连接配置也已为 DR 环境准备好。 - 如果 definitions 已导入,请确认已在导入前审查并调整站点特定 definitions。
建议: 在声明 DR 站点就绪之前,先同步所需的 RabbitMQ definitions 和平台资源。不要依赖 federation 来创建应用拓扑或安全定义。
federation parameter 或 policy 缺失
现象:
在 rabbitmq-dr 上执行 rabbitmqctl list_parameters -p / 或 rabbitmqctl list_policies -p / 时,没有看到预期对象。
检查项:
- 重新执行
set_parameter和set_policy命令。 - 确保这些命令是在 DR 集群和正确的 virtual host 上执行的。
- 确认 policy pattern 与 downstream exchange 名称完全匹配。
- 运行
kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl federation_status,确认链接是否缺失或已停止。
建议: 使用正确的 virtual host、exchange 名称和 policy pattern 重新创建 parameter 和 policy。
消息未出现在 DR 侧
现象:
向 app.events 发布成功,但 app-events-dr-q 仍为空。
检查项:
- 确认 upstream exchange 是
app.events,downstream exchange 是app-events-dr。 - 确认 downstream queue 已使用预期 routing key 绑定到
app-events-dr。 - 发布一个与 downstream binding 匹配的 routing key,例如
orders.created。 - 留出时间让异步 binding 传播和链接重连。
- 在
rabbitmq-dr上检查rabbitmqctl federation_status。健康链接通常会被报告为running。 - 检查 message TTL policy 是否在您消费之前就已过期较旧的测试消息。
建议: 修正 exchange 名称、binding pattern、routing key 或保留窗口,然后重新测试。
认证、网络或 TLS 问题中断了链接
现象: upstream parameter 存在,但复制不能稳定进行。
检查项:
- 验证 DR RabbitMQ pods 可以访问
<primary-host>:<primary-port>。 - 验证 upstream URI 中的用户可以连接到 upstream virtual host。
- 如果使用 TLS,请验证证书信任,并在 upstream URI 中使用
amqps://。 - 运行
kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl federation_status,查看链接是否为stopped或是否存在重复失败。
建议: 首先解决连接性或凭证问题,然后等待 federation link 重新连接。
在拓扑变更或重连后出现重复消息
现象: DR 侧的 consumer 收到了重复的业务事件。
检查项:
- 确认
max-hops设置得足够保守,例如在简单的 primary-to-DR 拓扑中设置为1。 - 检查是否意外配置了多个 upstream 或双向链接。
- 审查应用在重连和重试过程中的行为。
建议:
简化拓扑,将 max-hops 保持较低,并使 consumer 具备幂等性。