Redis學習筆記之十:Redis用作消息隊列



Redis不僅可作為緩存服務器,還可用作消息隊列。它的列表類型天生支持用作消息隊列。如下圖所示:

Redis學習筆記之十:Redis用作消息隊列

由於Redis的列表是使用雙向鏈表實現的,保存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。

所以可以直接使用Redis的List實現消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。簡單示例如下:


存放消息端(消息生產者):

<code>package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; import java.util.concurrent.TimeUnit; /** * 消息生產者 * @author yamikaze */public class Producer extends Thread {     public static final String MESSAGE_KEY = "message:queue";    private Jedis jedis;    private String producerName;    private volatile int count;     public Producer(String name) {        this.producerName = name;        init();    }     private void init() {        jedis = MyJedisFactory.getLocalJedis();    }     public void putMessage(String message) {        Long size = jedis.lpush(MESSAGE_KEY, message);        System.out.println(producerName + ": 當前未被處理消息條數為:" + size);        count++;    }     public int getCount() {        return count;    }     @Override    public void run() {        try {            while (true) {                putMessage(StringUtils.generate32Str());                TimeUnit.SECONDS.sleep(1);            }        } catch (InterruptedException e) {         } catch (Exception e) {            e.printStackTrace();        }    }     public static void main(String[] args) throws InterruptedException{        Producer producer = new Producer("myProducer");        producer.start();         for(; ;) {            System.out.println("main : 已存儲消息條數:" + producer.getCount());            TimeUnit.SECONDS.sleep(10);        }    }}/<code>

消息處理端(消息消費者):

<code>package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; /** * 消息消費者 * @author yamikaze */public class Customer extends Thread{     private String customerName;    private volatile int count;    private Jedis jedis;     public Customer(String name) {        this.customerName = name;        init();    }     private void init() {        jedis = MyJedisFactory.getLocalJedis();    }     public void processMessage() {        String message = jedis.rpop(Producer.MESSAGE_KEY);        if(message != null) {            count++;            handle(message);        }    }     public void handle(String message) {        System.out.println(customerName + " 正在處理消息,消息內容是: " + message + " 這是第" + count + "條");    }     @Override    public void run() {        while (true) {            processMessage();        }    }     public static void main(String[] args) {        Customer customer = new Customer("yamikaze");        customer.start();    }}/<code>

但上述例子中消息消費者有一個問題存在,即需要不停的調用rpop方法查看List中是否有待處理消息。每調用一次都會發起一次連接,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,但這樣做有兩個問題:

1、如果生產者速度大於消費者消費速度,消息隊列長度會一直增大,時間久了會佔用大量內存空間。

2、如果睡眠時間過長,這樣不能處理一些時效性的消息,睡眠時間過短,也會在連接上造成比較大的開銷。

所以可以使用brpop指令,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端可以將processMessage可以改為這樣:

<code>public void processMessage() {    /**     * brpop支持多個列表(隊列)     * brpop指令是支持隊列優先級的,比如這個例子中MESSAGE_KEY的優先級大於testKey(順序決定)。     * 如果兩個列表中都有元素,會優先返回優先級高的列表中的元素,所以這兒優先返回MESSAGE_KEY     * 0表示不限制等待,會一直阻塞在這兒     */    List<string> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");    if(messages.size() != 0) {        //由於該指令可以監聽多個Key,所以返回的是一個列表        //列表由2項組成,1) 列表名,2)數據        String keyName = messages.get(0);        //如果返回的是MESSAGE_KEY的消息        if(Producer.MESSAGE_KEY.equals(keyName)) {            String message = messages.get(1);            handle(message);        }     }    System.out.println("=======================");}/<string>/<code>

然後可以運行Customer,清空控制檯,可以看到程序沒有任何輸出,阻塞在了brpop這兒。然後在打開Redis的客戶端,輸入指令client list,可以查看當前有兩個連接。

發佈/訂閱模式

Redis除了對消息隊列提供支持外,還提供了一組命令用於支持發佈/訂閱模式。

1、發佈

PUBLISH指令可用於發佈一條消息,格式 PUBLISH channel message

Redis學習筆記之十:Redis用作消息隊列

返回值表示訂閱了該消息的數量。

2、訂閱

SUBSCRIBE指令用於接收一條消息,格式 SUBSCRIBE channel

Redis學習筆記之十:Redis用作消息隊列

可以看到使用SUBSCRIBE指令後進入了訂閱模式,但沒有接收到publish發送的消息,這是因為只有在消息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回覆。回覆分為三種類型:

1、如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是第幾個訂閱的頻道?(理解成序號?)

2、如果為message(消息),第二個值為產生該消息的頻道,第三個值為消息

3、如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當前客戶端的訂閱數量。

Redis學習筆記之十:Redis用作消息隊列

可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。

Redis還支持基於通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

再試試推送消息會得到以下結果:

Redis學習筆記之十:Redis用作消息隊列

可以看到publish指令返回的是2,而訂閱端這邊接收了兩次消息。這是因為PSUBSCRIBE指令可以重複訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時PUNSUBSCRIBE指令通配符不會展開。

例如:PUNSUBSCRIBE * 不會匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。

代碼示範如下:

<code>package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; /** * 消息發佈方 * @author yamikaze */public class Publisher {     public static final String CHANNEL_KEY = "channel:message";    private Jedis jedis;     public Publisher() {        jedis = MyJedisFactory.getLocalJedis();    }     public void publishMessage(String message) {        if(StringUtils.isBlank(message)) {            return;        }        jedis.publish(CHANNEL_KEY, message);    }     public static void main(String[] args) {        Publisher publisher = new Publisher();        publisher.publishMessage("Hello Redis!");    }}/<code>

簡單的發送一個消息。

消息訂閱方:

<code>package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; import java.util.concurrent.TimeUnit; /** * 消息訂閱方客戶端 * @author yamikaze */public class SubscribeClient {     private Jedis jedis;    private static final String EXIT_COMMAND = "exit";     public SubscribeClient() {        jedis = MyJedisFactory.getLocalJedis();    }     public void subscribe(String ...channel) {        if(channel == null || channel.length <= 0) {            return;        }        //消息處理,接收到消息時如何處理        JedisPubSub jps = new JedisPubSub() {            /**             * JedisPubSub類是一個沒有抽象方法的抽象類,裡面方法都是一些空實現             * 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage             * 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法             * 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數為byte[]             */            @Override            public void onMessage(String channel, String message) {                if(Publisher.CHANNEL_KEY.equals(channel)) {                    System.out.println("接收到消息: channel : " + message);                    //接收到exit消息後退出                    if(EXIT_COMMAND.equals(message)) {                        System.exit(0);                    }                 }            }             /**             * 訂閱時             */            @Override            public void onSubscribe(String channel, int subscribedChannels) {                if(Publisher.CHANNEL_KEY.equals(channel)) {                    System.out.println("訂閱了頻道:" + channel);                }            }        };        //可以訂閱多個頻道 當前線程會阻塞在這兒        jedis.subscribe(jps, channel);    }     public static void main(String[] args) {        SubscribeClient client = new SubscribeClient();        client.subscribe(Publisher.CHANNEL_KEY);        //並沒有 unsubscribe方法        //相應的也沒有punsubscribe方法    }}/<code>

先運行client,再運行Publisher進行消息發送,輸出結果:

Redis學習筆記之十:Redis用作消息隊列

總結:

使用Redis的List數據結構可以簡單迅速地做一個消息隊列,同時Redis提供的BRPOP和BLPOP等指令解決了頻繁調用Jedis的rpop和lpop方法造成的資源浪費問題。除此之外,Redis提供對發佈/訂閱模式的指令,可以實現消息傳遞、進程間通信。


分享到:


相關文章: