使用 Federated Exchanges 实现 RabbitMQ 灾备

本文介绍了一种基于 federated exchanges 的 RabbitMQ 灾备(DR)模式。在该模式中,主 RabbitMQ 实例或集群会将业务消息发布到 upstream exchange,而备用 DR RabbitMQ 实例或集群则使用一个从 upstream exchange federate 而来的 downstream exchange。

Federated exchanges 可为选定的消息流提供跨集群、异步的消息复制。它们适用于 warm-standby 设计、区域分发以及选择性复制。它们不是同步高可用机制,也不能替代应用故障切换、定期备份、RabbitMQ definitions 管理或持久队列设计。

适用场景

在需要以下一种或多种能力时,使用 federated exchanges:

  • 让远端 RabbitMQ 集群保持 warm 状态,并保留所选消息流的相对最新副本。
  • 只复制 exchange 的子集,而不是整个集群状态。
  • 容忍临时的 WAN 或跨集群连接故障,并允许链接自动重连。
  • 准备一个备用站点,在应用将 producer 和 consumer 切换到 DR 环境后能够接收消息。

在需要以下一种或多种能力时,federated exchanges 通常不是正确的解决方案:

  • 严格的 RPO=0 保证。
  • 跨集群的同步零丢失复制。
  • 应用侧 producer 或 consumer 的自动故障切换。
  • RabbitMQ users、permissions、exchanges、queues、bindings、policies、TLS 材料、Kubernetes 资源或应用配置的自动同步。
  • 作为备份、恢复或持久化存储规划的替代方案。

DR 模式的工作方式

在此示例中:

  • rabbitmq-primary 承载 upstream exchange app.events
  • rabbitmq-dr 承载 downstream exchange app-events-dr
  • rabbitmq-dr 上配置了 federation link。
  • rabbitmq-dr 上的 policy 将 app-events-dr 选为 federated exchange。
  • downstream 侧的 bindings 决定从 upstream 侧请求哪些消息。

从概念上讲,在主站点向 app.events 发布的消息,会被复制到 DR 站点的 app-events-dr,就像这些消息是直接本地发布到 downstream exchange 一样。

请使用与应用故障切换计划匹配的 exchange、queue 和 binding 名称。此示例使用不同的主站点和 DR exchange 名称,以明确方向。如果应用在故障切换后必须使用相同的 exchange 或 queue 名称,请在 DR 集群中声明这些名称,并一致地更新相关命令。

Federation 无法保证的内容

Federation links 是异步的。在网络分区、认证失败或 upstream 故障期间,消息可能会落后,或者在链接重连之前无法在 DR 侧获取。不要将该模式描述为同步 HA 或保证零丢失复制。

前提条件

在配置 federated exchanges 之前,请确保满足以下条件:

  1. 您有两个可达的 RabbitMQ 实例或集群,本文中分别命名为 rabbitmq-primaryrabbitmq-dr
  2. downstream DR 集群已启用 rabbitmq_federation 插件。
  3. 仅当您希望在管理 UI 或 API 中看到 federation 页面时,才在 downstream DR 集群上启用 rabbitmq_federation_management
  4. upstream primary 集群在 federated exchanges 场景下不需要 rabbitmq_federationrabbitmq_federation_management
  5. 您可以从 DR 侧通过 <primary-host>:<primary-port> 访问 upstream AMQP listener。
  6. 您可以访问两个环境的 management UI 或 CLI。
  7. federation URI 中的 upstream user 具有连接所需 virtual host 并访问 upstream exchange 的权限。
  8. 您了解与环境对应的访问地址、凭证和 namespace 值。
  9. 已为应用故障切换规划好 DR 环境所需的 RabbitMQ definitions。

您还应考虑以下设计要求:

  • 对重要消息流使用 durable exchanges 和 durable queues。
  • 当工作负载要求 broker 重启后可恢复时,发布 persistent 消息。
  • 使 consumer 具备幂等性,因为在重连、拓扑变更或多路径路由期间仍可能发生重复。
  • 除非已明确设计了循环防护和重复处理,否则避免双向或 mesh 风格拓扑。

启用 Federation 插件

此模式所需的插件位于 downstream DR 集群上。在 rabbitmq-dr 上启用 rabbitmq_federation。仅当您希望在管理 UI 或 API 中看到 federation 页面时,才在 rabbitmq-dr 上启用 rabbitmq_federation_management。upstream primary 集群不需要 federation 插件。

以下示例保持 primary 集群不配置 federation 插件,并通过 spec.rabbitmq.additionalPlugins 在 DR 集群上启用这些插件。

apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: rabbitmq-primary
  namespace: <namespace>
spec:
  replicas: 3
  ...
---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: rabbitmq-dr
  namespace: <namespace>
spec:
  replicas: 3
  rabbitmq:
    additionalPlugins:
      - rabbitmq_federation
      - rabbitmq_federation_management
  ...

operator 滚动更新后的 StatefulSet 完成后,请确认 DR 侧已启用插件:

kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmq-plugins list -e

输出中应包含 rabbitmq_federation。如果启用了 management 插件,输出中还应包含 rabbitmq_federation_management

为 DR 就绪准备 RabbitMQ Definitions

Federation 在 exchanges 之间移动选定消息。它不会同步应用拓扑、安全定义或平台资源。在执行 DR 切换之前,DR 集群中必须已经包含工作负载所需的 virtual hosts、exchanges、queues、bindings、users、permissions、policies、parameters、TLS 材料、Kubernetes Secret 对象以及应用配置。

请使用以下方法之一:

  • 如果 RabbitMQ definitions 由 GitOps、应用启动引导代码或其他声明式流程管理,请将相同的预期 definitions 应用于 DR 集群,并在那里验证。
  • 如果 primary 拓扑已存在且未以声明式方式管理,请从 primary 集群导出 definitions,并将审查后的 definitions 导入 DR 集群。

definitions 的导出和导入是一个时间点操作。其范围取决于您执行的是集群级导出还是单个 vhost 导出:

  • 当 DR 集群必须预先填充 virtual hosts、users、permissions、exchanges、queues、bindings、runtime parameters 和 policies 时,请使用集群级导出。
  • 仅当目标 virtual host、users 和 permissions 已在 DR 集群上准备就绪,而您只需要迁移该 virtual host 的拓扑时,才使用单个 vhost 导出。在 RabbitMQ 3.8.16 中,rabbitmqadmin --vhost / export ... 文件包含 vhost 作用域的拓扑键,例如 exchangesqueuesbindingsparameterspolicies,但不包含 userspermissionsvhosts

definitions 的导出和导入不会复制 queue 内容、durable message stores、stream 数据、存储在 RabbitMQ 之外的 Kubernetes 资源、TLS 密钥材料或应用配置。

为确保 DR 就绪,除非您明确只需要单个 vhost 的拓扑文件,否则请从 primary 集群导出集群级 definitions:

rabbitmqadmin \
  --host <primary-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  export primary-definitions.json

在导入 primary-definitions.json 之前,请先审查它。移除或调整仅适用于 primary 站点的 definitions,例如 upstream URI、shovel 参数、不应应用于 DR 的 policies、测试用户,或在站点之间故意不同的拓扑。请将该文件视为敏感文件,因为它可能包含用户密码哈希和运维配置。

将审查后的集群级 definitions 导入 DR 集群:

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  import primary-definitions.json

如果您只需要迁移 DR 集群中已存在的某个 virtual host 的拓扑,请在导出和导入命令中都加上 --vhost <vhost>

rabbitmqadmin \
  --host <primary-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  export primary-vhost-topology.json

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  import primary-vhost-topology.json

验证应用在故障切换后所需的 definitions:

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  list exchanges name type durable

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  list queues name durable policy arguments

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  list bindings source_name destination_name routing_key

如果您导入的是集群级 definitions,还应验证所需的 virtual hosts、users 和 permissions 是否存在:

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  list vhosts name

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  list users name tags

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  list permissions user vhost configure write read

如果您导入 primary definitions 后仍然使用 DR 专用的 downstream exchange 或 queue,请在导入后创建这些 DR 专用对象。下面的操作步骤会显式声明示例对象。如果您的 DR definitions 中已经包含等价对象,请先验证后跳过重复声明命令。

操作步骤

1. 准备 primary exchange

rabbitmq-primary 上声明 upstream exchange。下面的示例使用名为 app.events 的 durable topic exchange。

在下面的命令中,rabbitmqadmin 连接到 RabbitMQ management endpoint,例如端口 15672。后面配置的 federation upstream URI 必须使用 AMQP listener,例如 amqp:// 对应 5672amqps:// 对应 5671

rabbitmqadmin \
  --host <primary-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  declare exchange name=app.events type=topic durable=true

如果您的应用已经向现有 exchange 发布消息,请复用该 exchange 名称,而不是创建新的 exchange。

2. 准备 DR exchange、queue 和 binding

rabbitmq-dr 上声明一个 downstream exchange、一个 DR queue 和一个 binding。队列只需声明一次,然后通过 queue policy 做消息保留,而不要使用不同参数重新声明队列。

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  declare exchange name=app-events-dr type=topic durable=true

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  declare queue name=app-events-dr-q durable=true

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  declare binding source=app-events-dr destination_type=queue destination=app-events-dr-q routing_key="orders.*"

downstream binding 控制从 upstream 侧请求哪些 routing key。binding 更改会异步传播,因此在期望新的过滤行为生效之前,请留出一点时间。

3. 定义 DR backlog 保留窗口

federated exchanges 最好被视为一种有边界的 warm-standby 模式。DR 侧应保留消息的时间窗口是经过设计的,而不是在 standby consumer 停止时无限积累。

请在 DR queue 上使用 message TTL 来定义消息可以在 standby backlog 中保留多长时间。不要在这个 durable standby queue 上设置 expiresx-expires。Queue expiration 会在一段不活动时间后删除未使用的 queue。在 warm-standby 设计中,DR queue 可能会有意长时间没有活动 consumer,因此 queue expiration 可能会删除该 queue 以及已积累的 DR backlog。

以下示例通过应用 queue policy,使 app-events-dr-q 中的消息最多保留 24 小时:

kubectl exec -n <namespace> rabbitmq-dr-server-0 -- \
  rabbitmqctl set_policy -p / dr-queue-retention \
  "^app-events-dr-q$" \
  '{"message-ttl":86400000}' \
  --priority 20 \
  --apply-to queues

如果已有其他 queue policy 作用于 DR queue,请在该 policy 中添加 message-ttl,而不是创建冲突的 policy。一个 queue 可能会受 policy 优先级影响,因此请在变更后验证当前生效的 policy。

验证 policy 和 queue:

kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl list_policies -p /

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  list queues name durable policy arguments messages

此示例将 standby backlog 限定为 24 小时的消息保留窗口。更短的值会减少磁盘增长,但会缩小恢复窗口。更长的值会在故障切换期间提供更多可用的 standby 历史,但也会增加 DR 侧的磁盘使用和 backlog 风险。

请根据以下因素选择保留窗口:

  • 该工作负载可接受的恢复点。
  • primary 站点故障期间预期的峰值消息量。
  • DR 集群可用的存储空间。

4. 在 DR 集群上配置 federation upstream

rabbitmq-dr 上运行以下命令,创建名为 primary-app-events 的 federation upstream:

kubectl exec -n <namespace> rabbitmq-dr-server-0 -- \
  rabbitmqctl set_parameter -p / federation-upstream primary-app-events \
  '{"uri":"amqp://<username>:<password>@<primary-host>:<primary-port>/%2f","exchange":"app.events","max-hops":1,"reconnect-delay":5}'

此配置的含义如下:

  • uri:upstream RabbitMQ 集群的 AMQP 连接地址。末尾的 %2f 是默认 / virtual host 的 URL 编码形式。如果 upstream exchange 位于非默认 virtual host 中,请将 %2f 替换为该 upstream virtual host 的 URL 编码名称。
  • exchange:要从中消费的 upstream exchange。
  • max-hops:限制消息最多可经过多少条 federation link,并有助于避免循环。
  • reconnect-delay:控制链接断开后,在重新连接前等待多长时间。

如果您的环境需要 TLS,请使用 amqps:// URI,并使用 RabbitMQ AMQP URI 支持的相关 TLS 连接参数。

5. 为 downstream exchange 应用 federation policy

rabbitmq-dr 上应用 policy,使 downstream exchange app-events-dr 使用 upstream 定义。

kubectl exec -n <namespace> rabbitmq-dr-server-0 -- \
  rabbitmqctl set_policy -p / dr-federated-exchange \
  "^app-events-dr$" \
  '{"federation-upstream":"primary-app-events"}' \
  --priority 10 \
  --apply-to exchanges

验证 upstream parameter 和 policy 是否存在:

kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl list_parameters -p /
kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl list_policies -p /
kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl federation_status

6. 向 primary exchange 发布测试消息

rabbitmq-primary 上的 app.events 发布一个或多个持久化测试消息。

rabbitmqadmin \
  --host <primary-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  publish exchange=app.events routing_key=orders.created payload='{"event":"created","id":"1001"}' properties='{"delivery_mode":2,"content_type":"application/json"}'

rabbitmqadmin \
  --host <primary-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  publish exchange=app.events routing_key=orders.updated payload='{"event":"updated","id":"1001"}' properties='{"delivery_mode":2,"content_type":"application/json"}'

7. 验证 DR 侧接收到复制消息

在不删除 backlog 的情况下,检查 rabbitmq-dr 上绑定到 downstream exchange 的 queue:

rabbitmqadmin \
  --host <dr-host> \
  --port 15672 \
  --username <username> \
  --password <password> \
  --vhost / \
  get queue=app-events-dr-q count=10 ackmode=ack_requeue_true

如果 federation 正常工作,DR queue 会接收到发布到 upstream exchange 的消息,前提是这些消息的 routing key 与 downstream binding 匹配。ack_requeue_true 会将检查过的消息重新入队,因此在验证期间不会消耗 standby backlog。如果您使用的是一次性测试消息或一次性测试 queue 进行验证,在确认符合 DR 计划后,可以使用破坏性的确认模式。

8. 准备应用故障切换流程

Federation 只会移动选定的消息流,不会自动切换应用。对于实际的 DR 切换,请定义应用侧步骤,以便:

  1. 将 producer 重定向到 rabbitmq-dr
  2. 将 consumer 重定向到 rabbitmq-dr
  3. 确认 DR 侧已存在所需的 exchanges、queues、bindings、users、policies、TLS 材料、Kubernetes resources 和应用 secrets。
  4. 确认 DR queue backlog 处于恢复计划所期望的 message TTL 窗口内。
  5. 决定当 primary 站点恢复可用时,如何恢复或协调流量。

验证

配置完成后,使用以下检查项:

检查项命令预期结果
DR 上已启用 Federation 插件kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmq-plugins list -erabbitmq_federation 已启用。如果启用了 management 插件,则 rabbitmq_federation_management 也已启用
DR vhost 拓扑已存在rabbitmqadmin --host <dr-host> --port 15672 --username <username> --password <password> --vhost / list exchanges name type durable, rabbitmqadmin --host <dr-host> --port 15672 --username <username> --password <password> --vhost / list queues name durable policy arguments,以及 rabbitmqadmin --host <dr-host> --port 15672 --username <username> --password <password> --vhost / list bindings source_name destination_name routing_keyDR 工作负载所需的 exchanges、queues、bindings 和 policies 已存在于目标 virtual host 中
DR 集群级 definitions 已存在rabbitmqadmin --host <dr-host> --port 15672 --username <username> --password <password> list vhosts namerabbitmqadmin --host <dr-host> --port 15672 --username <username> --password <password> list users name tags,以及 rabbitmqadmin --host <dr-host> --port 15672 --username <username> --password <password> list permissions user vhost configure write read当您依赖集群级 definitions 导入时,DR 工作负载所需的 virtual hosts、users 和 permissions 已存在
DR queue 的 message TTL policy 已存在kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl list_policies -p /dr-queue-retention policy 或等效的现有 policy 已将 message-ttl 应用于 DR queue
DR 上存在 federation upstreamkubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl list_parameters -p /列出了名为 primary-app-eventsfederation-upstream 参数
DR 上存在 federation policykubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl list_policies -p /dr-federated-exchange policy 作用于 exchanges
DR 上 federation link 健康kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl federation_statusprimary-app-events 的链接状态被报告为 runningstopped、反复重启或错误状态表示连接或配置存在问题
primary exchange 已存在rabbitmqadmin --host <primary-host> --port 15672 --username <username> --password <password> --vhost / list exchanges name type durableapp.events 存在且为 durable
消息复制正常,且不会消耗 standby backlograbbitmqadmin --host <dr-host> --port 15672 --username <username> --password <password> --vhost / get queue=app-events-dr-q count=10 ackmode=ack_requeue_true发布到 app.events 的消息会出现在 DR 侧,并在检查后重新入队

如果您还在 DR 集群上启用了 rabbitmq_federation_management,则可以通过 management UI 中与 federation 相关的页面查看 federation 配置和运行时状态。

限制与设计说明

使用 federation 进行 DR 时,请牢记以下限制:

  • 复制是异步的,因此在正常运行期间会出现 lag,并且在网络问题期间可能加剧。
  • federated exchange 不能替代 mirrored 或 quorum queue 设计、持久化存储或备份。
  • 不保证 RPO=0。当链接不可用时,消息可能会延迟或在 DR 侧缺失。
  • Federation 不会同步 RabbitMQ definitions。请通过导出和导入、GitOps、应用启动引导代码或其他受控流程单独管理 definitions。
  • definitions 的导出和导入是快照操作。当 primary 拓扑、users、permissions、parameters 或 policies 发生变化时,请重新执行该操作或更新 DR definitions。
  • downstream bindings 会影响从 upstream 复制的内容。binding 更新是最终一致的,而不是即时生效的。
  • 直接发送到 downstream exchange 的发布,不会反向体现到只在 upstream 侧绑定的 queue。
  • 默认 exchange 和 internal exchanges 不能被 federate。
  • max-hops 有助于避免循环,但在复杂拓扑中无法消除所有重复场景。
  • durable exchanges 和 queues 可以降低恢复风险,但不会把 federation 从异步复制变成同步复制。
  • Message TTL 定义了实际可行的 warm-standby backlog 窗口。更大的值会增加可用回放窗口,但也会增加 DR 侧的磁盘消耗。
  • 对于必须在长时间无 consumer 的情况下仍然存活的 durable standby queue,不应使用 queue expiration。
  • 认证、授权、DNS、端口、防火墙规则以及 TLS certificate 信任都必须正确,链接才能保持健康。

故障排查

插件未启用

现象: 在 rabbitmq-dr 上执行 rabbitmq-plugins list -e 时,没有显示 rabbitmq_federation,或者在您期望支持 management UI 或 API 时,没有显示 rabbitmq_federation_management

检查项:

  • 确认 rabbitmq-dr 上的 spec.rabbitmq.additionalPlugins 包含 rabbitmq_federation
  • 如果您需要 federation management UI 或 API 页面,请确认 rabbitmq-dr 上的 spec.rabbitmq.additionalPlugins 也包含 rabbitmq_federation_management
  • 检查 DR RabbitMQ pods 是否在 spec 变更后已经重启。

建议: 更新 rabbitmq-drRabbitmqCluster 资源,等待 rollout 完成,然后再次在 DR 集群上验证插件。

DR definitions 缺失

现象: Federation 已配置,但在故障切换后,producer 或 consumer 失败,因为 rabbitmq-dr 上缺少 exchanges、queues、bindings、users、permissions 或 TLS 材料。

检查项:

  • 确认 definitions 是通过 GitOps、应用启动引导代码,还是 RabbitMQ definitions 的导出和导入进行管理的。
  • rabbitmq-dr 上列出应用所需的 exchanges、queues、bindings、users、permissions、parameters 和 policies。
  • 确认 Kubernetes Secret 对象、证书以及应用连接配置也已为 DR 环境准备好。
  • 如果 definitions 已导入,请确认已在导入前审查并调整站点特定 definitions。

建议: 在声明 DR 站点就绪之前,先同步所需的 RabbitMQ definitions 和平台资源。不要依赖 federation 来创建应用拓扑或安全定义。

federation parameter 或 policy 缺失

现象: 在 rabbitmq-dr 上执行 rabbitmqctl list_parameters -p /rabbitmqctl list_policies -p / 时,没有看到预期对象。

检查项:

  • 重新执行 set_parameterset_policy 命令。
  • 确保这些命令是在 DR 集群和正确的 virtual host 上执行的。
  • 确认 policy pattern 与 downstream exchange 名称完全匹配。
  • 运行 kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl federation_status,确认链接是否缺失或已停止。

建议: 使用正确的 virtual host、exchange 名称和 policy pattern 重新创建 parameter 和 policy。

消息未出现在 DR 侧

现象: 向 app.events 发布成功,但 app-events-dr-q 仍为空。

检查项:

  • 确认 upstream exchange 是 app.events,downstream exchange 是 app-events-dr
  • 确认 downstream queue 已使用预期 routing key 绑定到 app-events-dr
  • 发布一个与 downstream binding 匹配的 routing key,例如 orders.created
  • 留出时间让异步 binding 传播和链接重连。
  • rabbitmq-dr 上检查 rabbitmqctl federation_status。健康链接通常会被报告为 running
  • 检查 message TTL policy 是否在您消费之前就已过期较旧的测试消息。

建议: 修正 exchange 名称、binding pattern、routing key 或保留窗口,然后重新测试。

认证、网络或 TLS 问题中断了链接

现象: upstream parameter 存在,但复制不能稳定进行。

检查项:

  • 验证 DR RabbitMQ pods 可以访问 <primary-host>:<primary-port>
  • 验证 upstream URI 中的用户可以连接到 upstream virtual host。
  • 如果使用 TLS,请验证证书信任,并在 upstream URI 中使用 amqps://
  • 运行 kubectl exec -n <namespace> rabbitmq-dr-server-0 -- rabbitmqctl federation_status,查看链接是否为 stopped 或是否存在重复失败。

建议: 首先解决连接性或凭证问题,然后等待 federation link 重新连接。

在拓扑变更或重连后出现重复消息

现象: DR 侧的 consumer 收到了重复的业务事件。

检查项:

  • 确认 max-hops 设置得足够保守,例如在简单的 primary-to-DR 拓扑中设置为 1
  • 检查是否意外配置了多个 upstream 或双向链接。
  • 审查应用在重连和重试过程中的行为。

建议: 简化拓扑,将 max-hops 保持较低,并使 consumer 具备幂等性。