基於Redis實現簡單的延時消息隊列

基於Redis實現簡單的延時消息隊列

說到消息隊列相信作為開發人員的大家都不陌生,在實際的工作中我們可能在很多場景下都會用到消息隊列,消息隊列不僅僅是用於收發消息,而且也可以用於解耦我們的應用系統設計,在大型的應用系統或者分佈式應用系統中,我們必然會用到消息隊列。

總結下,消息隊列的應用場景一般有以下幾種場景:

  1. 異步處理任務;
  2. 應用系統解耦;
  3. 大流量削峰;
  4. 日誌處理系統;
  5. 消息通訊;


目前主流的消息隊列框架有:

  • Apache的ActiveMQ;
  • Erlang語言實現的RabbitMQ;
  • Apache的RocketMQ;
  • Apache的Kafka;

這幾種主流的消息隊列框架各有各的優勢,也有略微的不同。

基於Redis實現簡單的延時消息隊列

消息隊列比較


當然,消息隊列也有一些特殊的使用場景,比如:一些電商系統中,當用戶下單後,需要在規定的時間內對訂單發起支付,如果在規定的時間內沒有支付訂單,那麼該訂單將自動取消。這個問題的常規解決方案有兩個:

  1. 使用定時任務定時去掃描表,修改過期訂單的狀態。這種方案當然存在許多侷限性,當訂單數量不多,而且對系統性能要求不高的情況下可以考慮使用。這種方案也不夠優雅;
  2. 使用基於消息隊列的延時消息隊列,這種方案可以對整個消息隊列設置一個消息過期時間,也可以給每一個消息設置一個過期時間,這個時候消息的過期時間取決於設置的最小時間;

延時消息隊列我們可以採用上面所說的消息隊列框架去實現,也可以採用比較簡單的基於Redis的方式去實現,眾所周知Redis並不是一個消息隊列框架,但是Redis在某些應用場景下可以採用其高級特性為我們提供消息隊列的特性。


基於Redis實現簡單的延時消息隊列

Rdis在常規的應用場景下,我們使用它作為高速緩存框架,搜索百度百科我們可以發現,對Redis的定義如下:

Redis是一個key-value存儲系統。和Memcached類似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操作,而且這些操作都是原子性的。在此基礎上,redis支持各種不同方式的排序。與memcached一樣,為了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件,並且在此基礎上實現了master-slave(主從)同步。

Redis的最基本用法是我們使用它的Key-Value特性,存儲我們的熱點數據或者高頻訪問數據,來達到提高整個應用系統的吞吐量。

就像上面對Redis的定義,zset是對有序集合的操作,我們可以利用這一特性來實現我們的延時消息隊列功能。大致實現的原理如下:

Zset本質就是Set結構上加了個排序的功能,除了添加數據value之外,還提供另一屬性score,這一屬性在添加修改元素時候可以指定,每次指定後,Zset會自動重新按新的值調整順序。可以理解為有兩列字段的數據表,一列存value,一列存順序編號。操作中key理解為zset的名字,那麼對延時隊列又有何用呢?試想如果score代表的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它便會按照時間戳大小進行排序,也就是對執行時間前後進行排序,這樣的話,起一個死循環線程不斷地進行取第一個key值,如果當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,就可以達到延時執行的目的, 注意不需要遍歷整個Zset集合,以免造成性能浪費。


有了實現原理,下面我們通過一個簡單的例子來演示下具體的操作過程,可能對基於Redis實現延時消息隊列這一主題有更好的理解,演示的源代碼鏈接在文章末尾附上。

新建工程:delay-message-queue-redis

pom.xml

<code> 

<

project

xmlns

=

"http://maven.apache.org/POM/4.0.0"

xmlns:xsi

=

"http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation

=

"http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"

>

<

modelVersion

>

4.0.0

modelVersion

>

<

parent

>

<

groupId

>

com.delay.message.queue

groupId

>

<

artifactId

>

delay-message-queue

artifactId

>

<

version

>

0.0.1

version

>

parent

>

<

groupId

>

com.delay.message.queue

groupId

>

<

artifactId

>

delay-message-queue-redis

artifactId

>

<

version

>

0.0.1

version

>

<

name

>

delay-message-queue-redis

name

>

<

description

>

Demo project for Spring Boot

description

>

<

properties

>

<

java.version

>

1.8

java.version

>

properties

>

<

dependencies

>

<

dependency

>

<

groupId

>

org.springframework.boot

groupId

>

<

artifactId

>

spring-boot-starter-data-redis

artifactId

>

dependency

>

<

dependency

>

<

groupId

>

org.springframework.boot

groupId

>

<

artifactId

>

spring-boot-starter-web

artifactId

>

dependency

>

<

dependency

>

<

groupId

>

org.springframework.boot

groupId

>

<

artifactId

>

spring-boot-devtools

artifactId

>

<

scope

>

runtime

scope

>

<

optional

>

true

optional

>

dependency

>

<

dependency

>

<

groupId

>

org.springframework.boot

groupId

>

<

artifactId

>

spring-boot-configuration-processor

artifactId

>

<

optional

>

true

optional

>

dependency

>

<

dependency

>

<

groupId

>

org.projectlombok

groupId

>

<

artifactId

>

lombok

artifactId

>

<

optional

>

true

optional

>

dependency

>

<

dependency

>

<

groupId

>

org.springframework.boot

groupId

>

<

artifactId

>

spring-boot-starter-test

artifactId

>

<

scope

>

test

scope

>

dependency

>

dependencies

>

<

build

>

<

plugins

>

<

plugin

>

<

groupId

>

org.springframework.boot

groupId

>

<

artifactId

>

spring-boot-maven-plugin

artifactId

>

plugin

>

plugins

>

build

>

project

>

/<code>

操作Redis我們使用RedisTemplate,簡單的配置如下:

<code> 

public

class

RedisConfig

{

public

RedisConnectionFactory

redisConnectionFactory

()

{

return

new

LettuceConnectionFactory(

new

RedisStandaloneConfiguration(

"10.0.0.50"

,

6379

)); }

public

RedisTemplate

redisTemplate

()

{ RedisTemplate redisTemplate =

new

RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory());

return

redisTemplate; } }/<code>

定義我們的消息生產者:

<code> 4j
 

public

class

ProducerService

{

private

static

final

String QUEUE_NAME =

"delay_order_queue"

;

private

RedisTemplate redisTemplate;

public

void

produce

(String orderId,

long

expiredTime)

{ redisTemplate.opsForZSet().add(QUEUE_NAME, orderId, expiredTime);

long

length = redisTemplate.opsForZSet().size(QUEUE_NAME); } }/<code>

消息生產者主要完成的功能:以訂單的過期時間為元素(value)的得分,將數據添加到隊列中去;

定義我們的消息消費者:

<code>@Slf4j
@Service

public

class

ConsumerService

{

private

static

final

String QUEUE_NAME =

"delay_order_queue"

; @Autowired

private

RedisTemplate redisTemplate;

public

void

consume

()

{

while

(

true

) { Set

set

= redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME,

0

, System.currentTimeMillis(),

0

,

1

);

if

(

set

== null ||

set

.isEmpty()) {

try

{ Thread.sleep(

1000

); }

catch

(InterruptedException e) {

log

.error(

"InterruptedException"

, e); }

continue

; } String orderId =

set

.iterator().next();

if

(redisTemplate.opsForZSet().remove(QUEUE_NAME, orderId) >

0

) {

log

.info(

"order id:{} handle success"

, orderId);

long

length = redisTemplate.opsForZSet().size(QUEUE_NAME);

log

.info(

"[consume]{} length:{}"

, QUEUE_NAME, length); } } } }/<code>

消息消費者的主要功能如下:

  • 循環從Redis的Zset中拿取一個0<=score<=當前時間的元素(value);
  • 如果沒有取到值,則整個線程休眠一秒鐘;
  • 如果取到值,則從該隊列(key)刪除取到的元素(value),刪除的目的是防止一個數據被重複的消費;然後對取到的值進行後續的數據處理,這裡是將數據打印出來。

整個生產者和消費者我們已經實現了,下面我們通過Junit來生產消息和消費消息;

生產消息的過程:

<code> 4j
 
 (SpringJUnit4ClassRunner

.

class

)

public

class

ProducerServiceTest

{

private

ProducerService producerService;

public

void

produce

()

{ Random random =

new

Random(

1

);

for

(

int

i =

0

; i

10

; i++) { Calendar calendar = Calendar.getInstance();

int

time = random.nextInt(

100

); calendar.add(Calendar.SECOND, time); String orderId =

"order-id-"

+ i;

long

expired = calendar.getTimeInMillis(); log.info(

"order id:{}, expired:{}"

, orderId, time); producerService.produce(orderId, expired);

try

{ Thread.sleep(

500

); }

catch

(InterruptedException e) { log.error(

"InterruptedException"

, e); } } } }/<code>

為了方便我們觀察,我們採用在當前時間的基礎上加上一個隨機的時間作為訂單的過期時間,每產生一個訂單線程休眠0.5秒,總共產生10個訂單。

消息的消費過程:

<code>

@SpringBootTest

@RunWith

(SpringJUnit4ClassRunner.class) public class ConsumerServiceTest {

@Autowired

private ConsumerService consumerService;

@Test

public void consume() {

consumerService

.consume

(); } }/<code>

該過程比較簡單,它主要是啟動我們的循環函數從Redis中取數據。

先啟動我們的消息消費過程ConsumerServiceTest,然後再啟動我們的消息生產過程ProducerServiceTest,觀察日誌的打印輸出。

消息生產過程:


基於Redis實現簡單的延時消息隊列

消息生產過程

消息消費過程:


基於Redis實現簡單的延時消息隊列

消息消費過程

仔細對比我們可以很容易發現:

  1. order-id-5的過期時間最短,它也是最先被消費掉的,其次是order-id-7;
  2. 當生產消息的過程完成後共產生了10個消息,消息消費過程中,每消費一個消息,又沒有產生新的消息的時候,整個消息隊列的長度在變小;


基於Redis實現簡單的延時消息隊列

通過上面的演示,我們實現了基於Redis實現簡單的延時消息隊列,最主要我們使用了Redis的Zset特性來完成延時消息隊列的功能。

上面的演示在高併發場景下,可能會存在問題:

  • 高併發場景下對Redis的操作不夠原子性;
  • 不適合分佈式應用場景下使用;
  • 適合單體應用中延時消息隊列的使用;

基於上面所提到的問題,最好的解決方案是採用消息隊列框架去實現,例如:RabbitMQ給延時隊列統一設置消息過期時間,過期的消息將被路由到另一個隊列中;也可以給每一個消息設置一個過期時間,過期的消息同樣路由到另一個隊列中。


源代碼GitHub地址:

https://github.com/bq-xiao/delay-message-queue


不積跬步,無以至千里;不積小流,無以成江海!


分享到:


相關文章: