RabbitMQ入門教程(六):路由選擇Routing

簡介

本節主要演示使用直連接類型,將多個路由鍵綁定到同一個隊列上。也可以將同一個鍵綁定到多個隊列上(多重綁定multiple bindings),此時滿足鍵的隊列都能收到消息,不滿足的直接被丟棄。

RabbitMQ入門教程(六):路由選擇Routing

RabbitMQ入門教程(六):路由選擇Routing

生產者

<code>public class Producer {
@Test
public void testBasicPublish() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// Routing 的路由規則使用直連接
String EXCHANGE_NAME = "exchange.direct.routing";
String[] routingKeys = {"debug", "info", "warning", "error"};
for (int i = 0; i < 20; i++){
int random = (int)(Math.random() * 4);
String routingKey = routingKeys[random];
String message = "Hello RabbitMQ - " + routingKey + " - " + i;

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
}

// 關閉資源
channel.close();
connection.close();
}
}
/<code>

消費者1

<code>public class Consumer1 {
@Test
public void testBasicConsumer1() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

String EXCHANGE_NAME = "exchange.direct.routing";
// 生成一個隨機的名稱,queueDeclare()方法沒有任何參數,當最後一個消費者斷開時就會刪除掉該隊列,當消費者結束後可以看到隊列就刪除了
String QUEUE_NAME = channel.queueDeclare().getQueue();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 在消費者端隊列綁定

// 將一個對列綁定多個路由鍵
String[] routingKeys = {"debug", "info"};
for (int i = 0; i < routingKeys.length; i++) {
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
}

System.out.println("Consumer Wating Receive Message");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [C] Received '" + message + "', 處理業務中...");
}
};

channel.basicConsume(QUEUE_NAME, true, consumer);

Thread.sleep(1000000);
}
}
/<code>

消費者2

<code>public class Consumer2 {
@Test
public void testBasicConsumer2() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();



String EXCHANGE_NAME = "exchange.direct.routing";
// 生成一個隨機的名稱
String QUEUE_NAME = channel.queueDeclare().getQueue();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 在消費者端隊列綁定

// 將一個對列綁定多個路由鍵
String[] routingKeys = {"warning", "error"};
for (int i = 0; i < routingKeys.length; i++) {
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
}

System.out.println("Consumer Wating Receive Message");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [C] Received '" + message + "', 處理業務中...");
}
};

channel.basicConsume(QUEUE_NAME, true, consumer);

Thread.sleep(1000000);
}
}
/<code>

運行結果

RabbitMQ入門教程(六):路由選擇Routing

RabbitMQ入門教程(六):路由選擇Routing

RabbitMQ入門教程(六):路由選擇Routing

RabbitMQ入門教程(六):路由選擇Routing


分享到:


相關文章: