簡介
本節主要演示使用直連接類型,將多個路由鍵綁定到同一個隊列上。也可以將同一個鍵綁定到多個隊列上(多重綁定multiple bindings),此時滿足鍵的隊列都能收到消息,不滿足的直接被丟棄。
生產者
<code>public class Producer {
@Test
public void testBasicPublish() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Routing 的路由規則使用直連接
String EXCHANGE_NAME = "exchange.direct.routing";
String[] routingKeys = {"debug", "info", "warning", "error"};
for (int i = 0; i < 20; i++){
int random = (int)(Math.random() * 4);
String routingKey = routingKeys[random];
String message = "Hello RabbitMQ - " + routingKey + " - " + i;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
}
// 關閉資源
channel.close();
connection.close();
}
}
/<code>
消費者1
<code>public class Consumer1 {
@Test
public void testBasicConsumer1() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String EXCHANGE_NAME = "exchange.direct.routing";
// 生成一個隨機的名稱,queueDeclare()方法沒有任何參數,當最後一個消費者斷開時就會刪除掉該隊列,當消費者結束後可以看到隊列就刪除了
String QUEUE_NAME = channel.queueDeclare().getQueue();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 在消費者端隊列綁定
// 將一個對列綁定多個路由鍵
String[] routingKeys = {"debug", "info"};
for (int i = 0; i < routingKeys.length; i++) {
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
}
System.out.println("Consumer Wating Receive Message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [C] Received '" + message + "', 處理業務中...");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
Thread.sleep(1000000);
}
}
/<code>
消費者2
<code>public class Consumer2 {
@Test
public void testBasicConsumer2() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("mengday");
factory.setPassword("mengday");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String EXCHANGE_NAME = "exchange.direct.routing";
// 生成一個隨機的名稱
String QUEUE_NAME = channel.queueDeclare().getQueue();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 在消費者端隊列綁定
// 將一個對列綁定多個路由鍵
String[] routingKeys = {"warning", "error"};
for (int i = 0; i < routingKeys.length; i++) {
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
}
System.out.println("Consumer Wating Receive Message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [C] Received '" + message + "', 處理業務中...");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
Thread.sleep(1000000);
}
}
/<code>
運行結果
閱讀更多 Java實用技術 的文章
關鍵字: public routingKeys EXCHANGE