一篇詳解Redis -- 延時隊列

Redis的 list 數據結構常用來作為

異步消息隊列 使用,使用 rpush/lpush 操作 入隊 ,使用 lpop/rpop 來操作 出隊

一篇詳解Redis -- 延時隊列

> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)

空隊列

  1. 如果隊列為空,客戶端會陷入 pop的死循環空輪詢 不僅拉高了 客戶端的CPURedis的QPS 也會被拉高
  2. 如果空輪詢的客戶端有幾十個, Redis的慢查詢 也會顯著增加,可以嘗試讓客戶端線程 sleep 1s
  3. 但睡眠會導致消息的
    延遲增大 ,可以使用 blpop/brpop (blocking, 阻塞讀
  • 阻塞讀在隊列沒有數據時,會立即進入 休眠 狀態,一旦有數據到來,會立即被 喚醒消息延遲幾乎為0

空閒連接

  1. 如果線程一直阻塞在那裡,Redis的客戶端連接就成了 閒置連接
  2. 閒置過久, 服務器 一般會 主動斷開 連接, 減少閒置的資源佔用 ,此時 blpop/brpop 會 拋出異常

鎖衝突處理

  1. 分佈式鎖 加鎖失敗 的處理策略
  2. 直接拋出異常 ,通知用戶稍後重試
  3. sleep 後再重試
  4. 將請求轉移到 延時隊列 ,過一會重試
  5. 拋出異常
  6. 這種方式比較適合由 用戶直接發起 的請求
  7. sleep
  8. sleep會 阻塞 當前的消息處理線程,從而導致隊列的後續消息處理出現 延遲
  9. 如果 碰撞比較頻繁 ,sleep方案不合適
  10. 延時隊列
  11. 比較適合異步消息處理的場景,通過將當前衝突的請求轉移到另一個隊列 延後處理避免衝突

延時隊列

  1. 可以通過Redis的 zset 來實現延時隊列
  2. 將消息序列化成一個字符串作為zet的 value ,將該消息的 到期處理時間 作為 score
  3. 然後 多線程輪詢 zset獲取 到期的任務 進行處理
  • 多線程是為了保障 可用性 ,但同時要考慮 併發安全
    ,確保 任務不能被多次執行
public class RedisDelayingQueue {
@Data
@AllArgsConstructor
@NoArgsConstructor
private static class TaskItem {
private String id;
private T msg;
}
private Type taskType = new TypeReference<taskitem>>() {
}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem task = new TaskItem<>(UUID.randomUUID().toString(), msg);
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
}
public void loop() {
// 可以進一步優化,通過Lua腳本將zrangeByScore和zrem統一挪到Redis服務端進行原子化操作,減少搶奪失敗出現的資源浪費
while (!Thread.interrupted()) {
// 只取一條
Set<string> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) {
// zrem是多線程多進程爭奪任務的關鍵

TaskItem task = JSON.parseObject(s, taskType);
this.handleMsg(task.msg);
}
}
}
private void handleMsg(T msg) {
try {
System.out.println(msg);
} catch (Throwable ignored) {
// 一定要捕獲異常,避免因為個別任務處理問題導致循環異常退出
}
}
public static void main(String[] args) {
final RedisDelayingQueue<string> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("zhongmingmao" + i);
}
}
};
Thread consumer = new Thread() {
@Override
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException ignored) {
}
}
}
/<string>
/<string>
/<taskitem>


分享到:


相關文章: