#客户端连接
有多种连接方法供业务应用程序访问 RabbitMQ 实例。本节描述了如何使用 Java 和 Python 访问 RabbitMQ 实例。
#目录
#适用场景
| 访问方式 | 适用场景 |
|---|---|
| 集群内访问 | 集群内的应用程序通过 SVC 暴露的端口访问 RabbitMQ 实例。 |
| 集群外访问 | 集群外的应用程序通过 NodePort 的端口访问 RabbitMQ 实例。 通过适当的实例端口访问管理门户。 |
#连接到 RabbitMQ
先决条件
提示
确保在代码中将 <username> 和 <password> 替换为实例的实际账户信息。
确保在代码中将 <ip> 和 <port> 替换为实例的实际访问地址。如果是从集群外访问,可能会有多个访问地址;可以全部填写以增强连接的可靠性。
Java
Python
Java 访问示例如下:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;
public class ProducerTest {
public static void main(String[] args) throws Exception {
Address[] addresses = {
new Address(<ip 1>,<port 1>),
new Address(<ip 2>,<port 2>),
new Address(<ip 3>,<port 3>)
};
String username = "<username>";
String password = "<password>";
String vhostName = "/";
String exchangeName = "testExchange";
String exchangeType = "direct";
String queueName = "test";
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(vhostName);
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection(addresses);
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
channel.queueBind(queueName, exchangeName, "BindingKeyTest");
for (int i = 0; i < 100; i++ ) {
String message = "Hello World!"+ i;
channel.basicPublish(exchangeName, "BindingKeyTest", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}有关更多 Java 客户端的示例,请参考 Java 客户端 API 指南。
Python 访问示例如下:
import pika
import random
def on_message(channel, method_frame, header_frame, body):
print(method_frame.delivery_tag)
print(body)
print()
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
credentials = pika.PlainCredentials('<username>','<password>')
node1 = pika.ConnectionParameters(<ip 1>, <port 1>, '/', credentials)
node2 = pika.ConnectionParameters(<ip 2>, <port 2>, '/', credentials)
node3 = pika.ConnectionParameters(<ip 3>, <port 3>, '/', credentials)
all_endpoints = [node1,node2,node3]
while(True):
try:
print("连接中...")
random.shuffle(all_endpoints)
connection = pika.BlockingConnection(all_endpoints)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare('recovery-example', durable = False, auto_delete = True)
channel.basic_consume('recovery-example', on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
break
except pika.exceptions.ConnectionClosedByBroker:
continue
except pika.exceptions.AMQPChannelError as err:
print("捕获到频道错误: {}, 停止...".format(err))
break
except pika.exceptions.AMQPConnectionError:
print("连接已关闭,正在重试...")
continue有关更多 pika 的示例,请参考 pika 的 GitHub 仓库。
有关其他客户端的使用,请参考 RabbitMQ 教程。