Redis不僅可作為緩存服務器,還可用作消息隊列。它的列表類型天生支持用作消息隊列。如下圖所示:
由於Redis的列表是使用雙向鏈表實現的,保存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。
所以可以直接使用Redis的List實現消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。簡單示例如下:
存放消息端(消息生產者):
<code>package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; import java.util.concurrent.TimeUnit; /** * 消息生產者 * @author yamikaze */public class Producer extends Thread { public static final String MESSAGE_KEY = "message:queue"; private Jedis jedis; private String producerName; private volatile int count; public Producer(String name) { this.producerName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void putMessage(String message) { Long size = jedis.lpush(MESSAGE_KEY, message); System.out.println(producerName + ": 當前未被處理消息條數為:" + size); count++; } public int getCount() { return count; } @Override public void run() { try { while (true) { putMessage(StringUtils.generate32Str()); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException{ Producer producer = new Producer("myProducer"); producer.start(); for(; ;) { System.out.println("main : 已存儲消息條數:" + producer.getCount()); TimeUnit.SECONDS.sleep(10); } }}/<code>
消息處理端(消息消費者):
<code>package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; /** * 消息消費者 * @author yamikaze */public class Customer extends Thread{ private String customerName; private volatile int count; private Jedis jedis; public Customer(String name) { this.customerName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void processMessage() { String message = jedis.rpop(Producer.MESSAGE_KEY); if(message != null) { count++; handle(message); } } public void handle(String message) { System.out.println(customerName + " 正在處理消息,消息內容是: " + message + " 這是第" + count + "條"); } @Override public void run() { while (true) { processMessage(); } } public static void main(String[] args) { Customer customer = new Customer("yamikaze"); customer.start(); }}/<code>
但上述例子中消息消費者有一個問題存在,即需要不停的調用rpop方法查看List中是否有待處理消息。每調用一次都會發起一次連接,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,但這樣做有兩個問題:
1、如果生產者速度大於消費者消費速度,消息隊列長度會一直增大,時間久了會佔用大量內存空間。
2、如果睡眠時間過長,這樣不能處理一些時效性的消息,睡眠時間過短,也會在連接上造成比較大的開銷。
所以可以使用brpop指令,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端可以將processMessage可以改為這樣:
<code>public void processMessage() { /** * brpop支持多個列表(隊列) * brpop指令是支持隊列優先級的,比如這個例子中MESSAGE_KEY的優先級大於testKey(順序決定)。 * 如果兩個列表中都有元素,會優先返回優先級高的列表中的元素,所以這兒優先返回MESSAGE_KEY * 0表示不限制等待,會一直阻塞在這兒 */ List<string> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey"); if(messages.size() != 0) { //由於該指令可以監聽多個Key,所以返回的是一個列表 //列表由2項組成,1) 列表名,2)數據 String keyName = messages.get(0); //如果返回的是MESSAGE_KEY的消息 if(Producer.MESSAGE_KEY.equals(keyName)) { String message = messages.get(1); handle(message); } } System.out.println("=======================");}/<string>/<code>
然後可以運行Customer,清空控制檯,可以看到程序沒有任何輸出,阻塞在了brpop這兒。然後在打開Redis的客戶端,輸入指令client list,可以查看當前有兩個連接。
發佈/訂閱模式
Redis除了對消息隊列提供支持外,還提供了一組命令用於支持發佈/訂閱模式。
1、發佈
PUBLISH指令可用於發佈一條消息,格式 PUBLISH channel message
返回值表示訂閱了該消息的數量。
2、訂閱
SUBSCRIBE指令用於接收一條消息,格式 SUBSCRIBE channel
可以看到使用SUBSCRIBE指令後進入了訂閱模式,但沒有接收到publish發送的消息,這是因為只有在消息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回覆。回覆分為三種類型:
1、如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是第幾個訂閱的頻道?(理解成序號?)
2、如果為message(消息),第二個值為產生該消息的頻道,第三個值為消息
3、如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當前客戶端的訂閱數量。
可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。
Redis還支持基於通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:
再試試推送消息會得到以下結果:
可以看到publish指令返回的是2,而訂閱端這邊接收了兩次消息。這是因為PSUBSCRIBE指令可以重複訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時PUNSUBSCRIBE指令通配符不會展開。
例如:PUNSUBSCRIBE * 不會匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。
代碼示範如下:
<code>package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; /** * 消息發佈方 * @author yamikaze */public class Publisher { public static final String CHANNEL_KEY = "channel:message"; private Jedis jedis; public Publisher() { jedis = MyJedisFactory.getLocalJedis(); } public void publishMessage(String message) { if(StringUtils.isBlank(message)) { return; } jedis.publish(CHANNEL_KEY, message); } public static void main(String[] args) { Publisher publisher = new Publisher(); publisher.publishMessage("Hello Redis!"); }}/<code>
簡單的發送一個消息。
消息訂閱方:
<code>package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; import java.util.concurrent.TimeUnit; /** * 消息訂閱方客戶端 * @author yamikaze */public class SubscribeClient { private Jedis jedis; private static final String EXIT_COMMAND = "exit"; public SubscribeClient() { jedis = MyJedisFactory.getLocalJedis(); } public void subscribe(String ...channel) { if(channel == null || channel.length <= 0) { return; } //消息處理,接收到消息時如何處理 JedisPubSub jps = new JedisPubSub() { /** * JedisPubSub類是一個沒有抽象方法的抽象類,裡面方法都是一些空實現 * 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage * 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法 * 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數為byte[] */ @Override public void onMessage(String channel, String message) { if(Publisher.CHANNEL_KEY.equals(channel)) { System.out.println("接收到消息: channel : " + message); //接收到exit消息後退出 if(EXIT_COMMAND.equals(message)) { System.exit(0); } } } /** * 訂閱時 */ @Override public void onSubscribe(String channel, int subscribedChannels) { if(Publisher.CHANNEL_KEY.equals(channel)) { System.out.println("訂閱了頻道:" + channel); } } }; //可以訂閱多個頻道 當前線程會阻塞在這兒 jedis.subscribe(jps, channel); } public static void main(String[] args) { SubscribeClient client = new SubscribeClient(); client.subscribe(Publisher.CHANNEL_KEY); //並沒有 unsubscribe方法 //相應的也沒有punsubscribe方法 }}/<code>
先運行client,再運行Publisher進行消息發送,輸出結果:
總結:
使用Redis的List數據結構可以簡單迅速地做一個消息隊列,同時Redis提供的BRPOP和BLPOP等指令解決了頻繁調用Jedis的rpop和lpop方法造成的資源浪費問題。除此之外,Redis提供對發佈/訂閱模式的指令,可以實現消息傳遞、進程間通信。
閱讀更多 小紅修BUG 的文章