無鎖有序鍊錶的實現

2021-09-23 22:52:17 字數 4391 閱讀 5811

感謝同事【kevinlynx】在本站發表此文

無鎖有序鍊錶可以保證元素的唯一性,使其可用於雜湊表的桶,甚至直接作為乙個效率不那麼高的map。普通鍊錶的無鎖實現相對簡單點,因為插入元素可以在表頭插,而有序鍊錶的插入則是任意位置。

本文主要基於**high performance dynamic lock-free hash tables實現。

鍊錶的主要操作包含insertremove,先簡單實現乙個版本,就會看到問題所在,以下**只用作示例:

struct node_t ;

int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key)

pred = item;

item = item->next;

} *pred_ptr = pred;

*item_ptr = null;

return false;

}int l_insert(node_t *head, key_t key, value_t val)

new_item = (node_t*) malloc(sizeof(node_t));

new_item->key = key;

new_item->val = val;

new_item->next = item;

// a. 如果pred本身被移除了

if (cas(&pred->next, item, new_item))

free(new_item);}}

int l_remove(node_t *head, key_t key)

// b. 如果pred被移除;如果item也被移除

if (cas(&pred->next, item, item->next))

}}

l_find函式返回查詢到的前序元素和元素本身,**a和b雖然拿到了preditem,但在cas的時候,其可能被其他執行緒移除。甚至,在l_find過程中,其每乙個元素都可能被移除。問題在於,任何時候拿到乙個元素時,都不確定其是否還有效。元素的有效性包括其是否還在鍊錶中,其指向的記憶體是否還有效。

通過為元素指標增加乙個有效性標誌位,配合cas操作的互斥性,就可以解決元素有效性判定問題。

因為node_t放在記憶體中是會對齊的,所以指向node_t的指標值低幾位是不會用到的,從而可以在低幾位裡設定標誌,這樣在做cas的時候,就實現了dcas的效果,相當於將兩個邏輯上的操作變成了乙個原子操作。想象下引用計數物件的執行緒安全性,其內包裝的指標是執行緒安全的,但物件本身不是。

cas的互斥性,在若干個執行緒cas相同的物件時,只有乙個執行緒會成功,失敗的執行緒就可以以此判定目標物件發生了變更。改進後的**(**僅做示例用,不保證正確):

typedef size_t markable_t;

// 最低位置1,表示元素被刪除

#define has_mark(p) ((markable_t)p & 0x01)

#define mark(p) ((markable_t)p | 0x01)

#define strip_mark(p) ((markable_t)p & ~0x01)

int l_insert(node_t *head, key_t key, value_t val)

new_item = (node_t*) malloc(sizeof(node_t));

new_item->key = key;

new_item->val = val;

new_item->next = item;

// a. 雖然find拿到了合法的pred,但是在以下**之前pred可能被刪除,此時pred->next被標記

// pred->next != item,該cas會失敗,失敗後重試

if (cas(&pred->next, item, new_item))

free(new_item);

}return false;

}int l_remove(node_t *head, key_t key)

node_t *inext = item->next;

// b. 刪除item前先標記item->next,如果cas失敗,那麼情況同insert一樣,有其他執行緒在find之後

// 刪除了item,失敗後重試

if (!cas(&item->next, inext, mark(inext)))

// c. 對同乙個元素item刪除時,只會有乙個執行緒成功走到這裡

if (cas(&pred->next, item, strip_mark(item->next)))

}return false;

}int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key)

int d = key_cmp(item->key, key);

if (d >= 0)

pred = item;

item = item->next;

} *pred_ptr = pred;

*item_ptr = null;

return false;

}

haz_gethaz_set_ptr之類的函式是乙個hazard pointer實現,用於支援多執行緒下記憶體的gc。上面的**中,要刪除乙個元素item時,會標記item->next,從而使得insert時中那個cas不需要做任何調整。總結下這裡的執行緒競爭情況:

aba問題還是存在的,insert中:

if (cas(&pred->next, item, new_item))

如果cas之前,pred後的item被移除,又以相同的位址值加進來,但其value變了,此時cas會成功,但鍊錶可能就不是有序的了。pred->val < new_item->val > item->val

為了解決這個問題,可以利用指標值位址對齊的其他位來儲存乙個計數,用於表示pred->next的改變次數。當insert拿到pred時,pred->next中儲存的計數假設是0,cas之前其他執行緒移除了pred->next又新增回了item,此時pred->next中的計數增加,從而導致insertcas失敗。

// 最低位留作刪除標誌

#define mask ((sizeof(node_t) - 1) & ~0x01)

#define get_tag(p) ((markable_t)p & mask)

#define tag(p, tag) ((markable_t)p | (tag))

#define mark(p) ((markable_t)p | 0x01)

#define has_mark(p) ((markable_t)p & 0x01)

#define strip_mark(p) ((node_t*)((markable_t)p & ~(mask | 0x01)))

remove的實現:

/* 先標記再刪除 */

if (!cas(&sitem->next, inext, mark(inext)))

int tag = get_tag(pred->next) + 1;

if (cas(&pred->next, item, tag(strip_mark(sitem->next), tag)))

insert中也可以更新pred->next的計數。

無鎖的實現,本質上都會依賴於cas的互斥性。從頭實現乙個lock free的資料結構,可以深刻感受到lock free實現的tricky。最終**可以從這裡github獲取。**中為了簡單,實現了乙個不是很強大的hazard pointer,可以參考之前的博文。

基於雙向鍊錶實現無鎖佇列

由於鍊錶的特性,因此可以知道其進行新增元素或者刪除元素只需要進行節點的改變即可,同時不需要考慮擴容和縮容的問題,比較方便。那實現佇列,需要的是生產者和消費者的模型。其本質是執行進隊和出隊操作。下面的 源於 那麼怎樣才能實現乙個併發無鎖的佇列呢?首先需要考慮佇列是基於鍊錶的,因此我們能操作它的前驅節點...

無鎖雜湊表的實現

無鎖雜湊表 lock free hash table 可以提高多執行緒下的效能表現,但是因為實現乙個無鎖雜湊表本身的複雜度不小。ps 真正的複雜在於出錯之後的除錯,因為多執行緒下的除錯本身就很複雜,引入無鎖資料結構之後,傳統的看堆疊資訊和列印log都基本上沒有意義了。堆疊中的資料可能被併發訪問破壞,...

有序表的合併 煉表表實現

include using namespace std 函式結果狀態 define true 1 define false 0 define ok 1 define error 0 define infeasible 1 define overflow 2 define maxsize 100 st...