Redis(2)——跳躍表

原文鏈接:https://mp.weixin.qq.com/s/gA8CUpk6BivLGrizRRv_Bw


一、跳躍表簡介

跳躍表(skiplist)是一種隨機化的數據結構,由 William Pugh 在論文《Skip lists: a probabilistic alternative to balanced trees》中提出,是一種可以與平衡樹媲美的層次化鏈表結構——查找、刪除、添加等操作都可以在對數期望時間下完成,以下是一個典型的跳躍表例子:

Redis(2)——跳躍表

我們在上一篇中提到了 Redis 的五種基本結構中,有一個叫做 有序列表 zset 的數據結構,它類似於 Java 中的 SortedSetHashMap 的結合體,一方面它是一個 set 保證了內部 value 的唯一性,另一方面又可以給每個 value 賦予一個排序的權重值 score,來達到 排序的目的。

它的內部實現就依賴了一種叫做 「跳躍列表」 的數據結構。

為什麼使用跳躍表

首先,因為 zset 要支持隨機的插入和刪除,所以它 不宜使用數組來實現,關於排序問題,我們也很容易就想到 紅黑樹/ 平衡樹 這樣的樹形結構,為什麼 Redis 不使用這樣一些結構呢?

  1. 性能考慮: 在高併發的情況下,樹形結構需要執行一些類似於 rebalance 這樣的可能涉及整棵樹的操作,相對來說跳躍表的變化只涉及局部 (下面詳細說);
  2. 實現考慮: 在複雜度與紅黑樹相同的情況下,跳躍表實現起來更簡單,看起來也更加直觀;

基於以上的一些考慮,Redis 基於 William Pugh 的論文做出一些改進後採用了 跳躍表 這樣的結構。

本質是解決查找問題

我們先來看一個普通的鏈表結構:

Redis(2)——跳躍表

我們需要這個鏈表按照 score 值進行排序,這也就意味著,當我們需要添加新的元素時,我們需要定位到插入點,這樣才可以繼續保證鏈表是有序的,通常我們會使用 二分查找法,但二分查找是有序數組的,鏈表沒辦法進行位置定位,我們除了遍歷整個找到第一個比給定數據大的節點為止 (時間複雜度 O(n)) 似乎沒有更好的辦法。

但假如我們每相鄰兩個節點之間就增加一個指針,讓指針指向下一個節點,如下圖:

Redis(2)——跳躍表

這樣所有新增的指針連成了一個新的鏈表,但它包含的數據卻只有原來的一半 (圖中的為 3,11)。

現在假設我們想要查找數據時,可以根據這條新的鏈表查找,如果碰到比待查找數據大的節點時,再回到原來的鏈表中進行查找,比如,我們想要查找 7,查找的路徑則是沿著下圖中標註出的紅色指針所指向的方向進行的:

Redis(2)——跳躍表

這是一個略微極端的例子,但我們仍然可以看到,通過新增加的指針查找,我們不再需要與鏈表上的每一個節點逐一進行比較,這樣改進之後需要比較的節點數大概只有原來的一半。

利用同樣的方式,我們可以在新產生的鏈表上,繼續為每兩個相鄰的節點增加一個指針,從而產生第三層鏈表:

Redis(2)——跳躍表

在這個新的三層鏈表結構中,我們試著 查找 13,那麼沿著最上層鏈表首先比較的是 11,發現 11 比 13 小,於是我們就知道只需要到 11 後面繼續查找,從而一下子跳過了 11 前面的所有節點。

可以想象,當鏈表足夠長,這樣的多層鏈表結構可以幫助我們跳過很多下層節點,從而加快查找的效率。

更進一步的跳躍表

跳躍表 skiplist 就是受到這種多層鏈表結構的啟發而設計出來的。按照上面生成鏈表的方式,上面每一層鏈表的節點個數,是下面一層的節點個數的一半,這樣查找過程就非常類似於一個二分查找,使得查找的時間複雜度可以降低到 O(logn)。

但是,這種方法在插入數據的時候有很大的問題。新插入一個節點之後,就會打亂上下相鄰兩層鏈表上節點個數嚴格的 2:1 的對應關係。如果要維持這種對應關係,就必須把新插入的節點後面的所有節點 (也包括新插入的節點) 重新進行調整,這會讓時間複雜度重新蛻化成O(n)。刪除數據也有同樣的問題。

skiplist 為了避免這一問題,它不要求上下相鄰兩層鏈表之間的節點個數有嚴格的對應關係,而是 為每個節點隨機出一個層數(level)。比如,一個節點隨機出的層數是 3,那麼就把它鏈入到第 1 層到第 3 層這三層鏈表中。為了表達清楚,下圖展示瞭如何通過一步步的插入操作從而形成一個 skiplist 的過程:

Redis(2)——跳躍表

從上面的創建和插入的過程中可以看出,每一個節點的層數(level)是隨機出來的,而且新插入一個節點並不會影響到其他節點的層數,因此,插入操作只需要修改節點前後的指針,而不需要對多個節點都進行調整,這就降低了插入操作的複雜度。

現在我們假設從我們剛才創建的這個結構中查找 23 這個不存在的數,那麼查找路徑會如下圖:

Redis(2)——跳躍表

二、跳躍表的實現

Redis 中的跳躍表由 server.h/zskiplistNode 和 server.h/zskiplist 兩個結構定義,前者為跳躍表節點,後者則保存了跳躍節點的相關信息,同之前的 集合 list 結構類似,其實只有 zskiplistNode 就可以實現了,但是引入後者是為了更加方便的操作:

<code>/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// value
sds ele;
// 分值
double score;
// 後退指針

struct zskiplistNode *backward;
// 層
struct zskiplistLevel {
// 前進指針
struct zskiplistNode *forward;
// 跨度
unsigned long span;
} level[];
} zskiplistNode;

typedef struct zskiplist {
// 跳躍表頭指針
struct zskiplistNode *header, *tail;
// 表中節點的數量
unsigned long length;
// 表中層數最大的節點的層數
int level;
} zskiplist;/<code>

正如文章開頭畫出來的那張標準的跳躍表那樣。

隨機層數

對於每一個新插入的節點,都需要調用一個隨機算法給它分配一個合理的層數,源碼在 t_zset.c/zslRandomLevel(void) 中被定義:

<code>int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<zskiplist>}/<zskiplist>/<code>

直觀上期望的目標是 50% 的概率被分配到 Level 1,25% 的概率被分配到 Level 2,12.5% 的概率被分配到 Level 3,以此類推...有 2-63 的概率被分配到最頂層,因為這裡每一層的晉升率都是 50%。

Redis 跳躍表默認允許最大的層數是 32,被源碼中 ZSKIPLIST_MAXLEVEL 定義,當Level[0] 有 264 個元素時,才能達到 32 層,所以定義 32 完全夠用了。

創建跳躍表

這個過程比較簡單,在源碼中的 t_zset.c/zslCreate 中被定義:

<code>zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

// 申請內存空間
zsl = zmalloc(sizeof(*zsl));
// 初始化層數為 1
zsl->level = 1;
// 初始化長度為 0
zsl->length = 0;
// 創建一個層數為 32,分數為 0,沒有 value 值的跳躍表頭節點
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);

// 跳躍表頭節點初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
// 將跳躍表頭節點的所有前進指針 forward 設置為 NULL
zsl->header->level[j].forward = NULL;
// 將跳躍表頭節點的所有跨度 span 設置為 0
zsl->header->level[j].span = 0;
}
// 跳躍表頭節點的後退指針 backward 置為 NULL
zsl->header->backward = NULL;
// 表頭指向跳躍表尾節點的指針置為 NULL
zsl->tail = NULL;
return zsl;
}/<code>

即執行完之後創建瞭如下結構的初始化跳躍表:

Redis(2)——跳躍表

插入節點實現

這幾乎是最重要的一段代碼了,但總體思路也比較清晰簡單,如果理解了上面所說的跳躍表的原理,那麼很容易理清楚插入節點時發生的幾個動作 (幾乎跟鏈表類似):

  1. 找到當前我需要插入的位置 (其中包括相同 score 時的處理);
  2. 創建新節點,調整前後的指針指向,完成插入;

為了方便閱讀,我把源碼 t_zset.c/zslInsert 定義的插入函數拆成了幾個部分

<code>// 存儲搜索路徑
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 存儲經過的節點跨度
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;/<code>

第二部分:搜索當前節點插入位置

<code>serverAssert(!isnan(score));
x = zsl->header;
// 逐步降級尋找目標節點,得到 "搜索路徑"
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 如果 score 相等,還需要比較 value 值
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 記錄 "搜索路徑"
update[i] = x;
}/<code>

討論: 有一種極端的情況,就是跳躍表中的所有 score 值都是一樣,zset 的查找性能會不會退化為 O(n) 呢?

從上面的源碼中我們可以發現 zset 的排序元素不只是看 score 值,也會比較 value 值 (字符串比較)

第三部分:生成插入節點

<code>/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
// 如果隨機生成的 level 超過了當前最大 level 需要更新跳躍表的信息
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 創建新節點
x = zslCreateNode(level,score,ele);/<code>

第四部分:重排前向指針

<code>for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}/<code>

第五部分:重排後向指針並返回

<code>x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;/<code>

節點刪除實現

刪除過程由源碼中的 t_zset.c/zslDeleteNode 定義,和插入過程類似,都需要先把這個 "搜索路徑" 找出來,然後對於每個層的相關節點重排一下前向後向指針,同時還要注意更新一下最高層數 maxLevel,直接放源碼 (如果理解了插入這裡還是很容易理解的):

<code>/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}

/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise

* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}/<code>

節點更新實現

當我們調用 ZADD 方法時,如果對應的 value 不存在,那就是插入過程,如果這個 value 已經存在,只是調整一下 score 的值,那就需要走一個更新流程。

假設這個新的 score 值並不會帶來排序上的變化,那麼就不需要調整位置,直接修改元素的 score 值就可以了,但是如果排序位置改變了,那就需要調整位置,該如何調整呢?

從源碼 t_zset.c/zsetAdd 函數 1350 行左右可以看到,Redis 採用了一個非常簡單的策略:

<code>/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}/<code>

把這個元素刪除再插入這個,需要經過兩次路徑搜索,從這一點上來看,Redis 的 ZADD 代碼似乎還有進一步優化的空間。

元素排名的實現

跳躍表本身是有序的,Redis 在 skiplist 的 forward 指針上進行了優化,給每一個 forward 指針都增加了 span 屬性,用來 表示從前一個節點沿著當前層的 forward 指針跳到當前這個節點中間會跳過多少個節點。在上面的源碼中我們也可以看到 Redis 在插入、刪除操作時都會小心翼翼地更新 span 值的大小。

所以,沿著 "搜索路徑",把所有經過節點的跨度 span 值進行累加就可以算出當前元素的最終 rank 值了:

<code>/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */

unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
// span 累加
rank += x->level[i].span;
x = x->level[i].forward;
}

/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}/<code>


分享到:


相關文章: