使用 Shovel 进行消息迁移

当你需要在 broker、cluster 或 virtual host 之间移动消息时,请使用 RabbitMQ Shovel。Shovel 非常适合队列到队列迁移、临时转发,或将消息受控重放到另一个环境中。

Shovel 不是完整的集群同步机制。它不会同步用户、权限、virtual host、exchange、queue、binding、Kubernetes 资源或应用配置。在依赖 Shovel 进行工作负载迁移之前,请单独准备这些对象。

适用场景

在需要以下一种或多种场景时使用 Shovel:

  • 将消息从新 cluster 中的一个 queue 迁移到另一个 queue。
  • 在受控迁移期间,将消息重放到替换环境中。
  • 在计划内切换期间,在站点之间移动 backlog。

当你需要基于 exchange 的异步复制,而不是 queue 到 queue 传输时,请使用 使用 Federated Exchanges 的 RabbitMQ 灾难恢复

前提条件

在配置 Shovel 之前,请确保满足以下条件:

  1. 托管 Shovel 的 cluster 已启用 rabbitmq_shovel plugin。
  2. 当你需要通过 management UI 或 HTTP API 查看动态 Shovel 时,请启用 rabbitmq_shovel_management
  3. 源端和目标端 RabbitMQ 端点能够从托管 Shovel 的 cluster 访问到。
  4. 源端和目标端用户具有所需权限。
  5. 目标 exchange 或 queue 已经存在,或者你有明确的流程在迁移前创建它们。

启用 Shovel plugin

以下示例在运行 Shovel 的 cluster 上启用所需 plugin:

apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
  name: rabbitmq-migrator
  namespace: <namespace>
spec:
  rabbitmq:
    additionalPlugins:
      - rabbitmq_shovel
      - rabbitmq_shovel_management

验证已启用的 plugin:

kubectl -n <namespace> exec rabbitmq-migrator-server-0 -- \
  rabbitmq-plugins list -e

操作步骤

1. 选择动态 Shovel 的主机 cluster

动态 Shovel 作为 RabbitMQ runtime parameter 存储。请在同时与源端和目标端 broker 具有可靠网络连接的 cluster 上创建 Shovel。

2. 创建动态 Shovel

以下示例将消息从源端 broker 上的 orders queue 移动到目标端 broker 上的 orders-migrated queue:

kubectl -n <namespace> exec rabbitmq-migrator-server-0 -- \
  rabbitmqctl set_parameter -p / shovel move-orders \
  '{
    "src-protocol":"amqp091",
    "src-uri":"amqp://<source-user>:<source-password>@<source-host>:5672/%2f",
    "src-queue":"orders",
    "dest-protocol":"amqp091",
    "dest-uri":"amqp://<dest-user>:<dest-password>@<dest-host>:5672/%2f",
    "dest-queue":"orders-migrated",
    "ack-mode":"on-confirm",
    "reconnect-delay":5,
    "delete-after":"never"
  }'

%2f 是 URL 编码后的默认 / virtual host。请将其替换为你实际使用的、经过 URL 编码的 virtual host 名称。如果用户名或密码包含保留的 URI 字符,也请一并进行 URL 编码。

在生产迁移中请使用 ack-mode="on-confirm",这样只有在目标端确认发布后,源端才会收到确认。

3. 检查 Shovel 状态

验证 Shovel 正在运行:

kubectl -n <namespace> exec rabbitmq-migrator-server-0 -- \
  rabbitmqctl shovel_status

你也可以检查 runtime parameter:

kubectl -n <namespace> exec rabbitmq-migrator-server-0 -- \
  rabbitmqctl list_parameters -p /

4. 验证消息迁移

检查源端和目标端 queue 深度:

rabbitmqadmin \
  --host <source-management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  list queues name messages consumers

rabbitmqadmin \
  --host <dest-management-host> \
  --port 15672 \
  --username <admin-user> \
  --password <admin-password> \
  --vhost / \
  list queues name messages consumers

5. 迁移完成后移除 Shovel

如果 Shovel 只是临时使用,请在迁移完成后将其移除:

kubectl -n <namespace> exec rabbitmq-migrator-server-0 -- \
  rabbitmqctl clear_parameter -p / shovel move-orders

运维说明

  • Shovel 迁移的是消息,而不是完整的 broker 状态。
  • 如果目标 queue 不存在,Shovel 将失败,除非所选目标 exchange 和路由行为能够创建预期路径。
  • 网络中断会按照配置的重连延迟触发重连尝试。
  • 对于长时间运行的迁移任务,请同时监控源端和目标端的 queue 增长、consumer 堆积量以及 broker 告警。

相关信息