面試必知必會:聊聊高性能延時隊列應用?

寫在最前:(推薦大家閱讀一下下啦)

  • <strong>
  • <strong>
  • <strong>

延時隊列的應用場景:

下單後,30分鐘內未付款就自動取消訂單等; 支付後,24小時未評論自動好評; 在我們實際開發過程中,應用場景很多...

面試必知必會:聊聊高性能延時隊列應用?

基於Redis Zset 實現

實現原理

Redis由於其自身的Zset數據結構,也同樣可以實現延時的操作。 Zset本質就是Set結構上加了個排序的功能,除了添加數據value之外,還提供另一屬性score,這一屬性在添加元素時候可以指定,每次指定score後,Zset會自動重新按新的值調整順序

  1. 如果score代表的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它會按照時間戳大小進行排序,也就是對執行時間前後進行排序。
<code>> ZADD delay_queue 1581309229  taskId_1(integer) 1> ZADD delay_queue 1581309129  taskId_2(integer) 1> ZADD delay_queue 1581309329  taskId_3(integer) 1/<code>
  1. 不斷地進行取第一個key值,如果當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,就可以達到延時執行的目的。 注意不需要遍歷整個Zset集合,以免造成性能浪費。
<code>> ZRANGE delay_queue 0 -1 withscores1) "taskId_2"2) "1581309129"3) "taskId_1"4) "1581309229"5) "taskId_3"6) "1581309329"/<code>

使用注意

  • 遍歷邏輯,刪除邏輯,注意使用 Redis Lua 封裝,確保原子性操作。更要注意 Redis Lua 在 Redis Cluster 的偽集群問題。
  • 若是JAVA 語言可以直接使用 redisson,封裝了 DelayedQueue 的實現。

源碼邏輯 org/redisson/RedissonDelayedQueue.java

面試必知必會:聊聊高性能延時隊列應用?


Beanstalkd 消息隊列

Beanstalkd,一個高性能、輕量級的分佈式內存隊列系統。支持過有9.5 million用戶的Facebook Causes應用。後來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。

部署使用

  • Linux 安裝 || docker 部署
<code>yum install beanstalkd||docker run -d -p 11300:11300 pig4cloud/beanstalkd/<code>
  • 客戶端使用,pom 依賴
<code><dependency>    <groupid>com.pig4cloud.beanstalk/<groupid>    <artifactid>beanstalkd-client-spring-boot-starter/<artifactid>    <version>0.0.1/<version>/<dependency>/<code>
  • 默認配置
面試必知必會:聊聊高性能延時隊列應用?

  • 代碼使用
<code>@Autowiredprivate JobProducer producer;/** * @param delay    是一個整形數,表示將job放入ready隊列需要等待的秒數 * @param ttr      time to run—是一個整形數,表示允許一個worker執行該job的秒數。這個時間將從一個worker 獲取一個job開始計算。 *                 如果該worker沒能在 秒內刪除、釋放或休眠該job,這個job就會超時,服務端會主動釋放該job。 *                 最小ttr為1。如果客戶端設置了0,服務端會默認將其增加到1。 * @param priority 優先級 0~2**32的整數,最高優先級是0 */@Testpublic void testSend() {String taskId = "1";// 業務對象信息    producer.putJob(0, 10, 10, taskId.getBytes());}/<code>
<code>@Componentpublic class DemoJobConsumer extends AbstractTubeConsumerListener {    @Override    public void work(JobConsumer consumer) {        // 阻塞多少秒獲取一次 Job        Job job = consumer.reserveJob(1000L);        // 消費此Job        consumer.deleteJob(job.getId());        // 執行延時的業務邏輯        String biz = new String(job.getData());    }}/<code>

擴展

  • 數據庫,利用定時任務輪詢實現,業務量大會性能瓶頸。
  • 延時隊列的其他實現,比如 rabbitmq 利用ttl特性可以實現。無法取消已放入隊列裡面的數據,使用時特別注意死信隊列的配置等。
  • 還可以自己根據 時間輪片的算法 自行實現 。
  • 總之一切,都要有補償的邏輯,無論是業務人員手動觸發還是自動補償。


作者:冷冷gg
原文鏈接:https://juejin.im/post/5e420d75518825497467e908


分享到:


相關文章: