设计无锁并发数据结构——无锁结构实例

前言

 
为了演示一些在设计无锁数据结构中用到的技术,本节将给出一系列简单数据结构的无锁实现。
如前文所述,无锁结构依赖原子操作和相关的内存顺序保证来确保数据以正确的顺序对其他线程可见。在本节之初,所有的原子操作将使用默认的memory_order_seq_cst内存顺序,因为它最好理解(所有的memory_order_seq_cst操作构成一个全序),但后续将逐渐减少约束至memory_order_acquirememory_order_release,甚至memory_order_relaxed
虽然实例中没有直接使用互斥锁,但需要注意,仅有std::atomic_flag保证无锁实现。如C++内存模型与原子类型操作一章所述,在某些平台上C++标准库内的原子类型可能是基于锁实现的,此时倒不如选择直接使用基于锁的数据结构。


无锁线程安全栈

 
栈的最简底层结构是链表,其内部维持一个head节点指向当前栈顶。

push

在上述方案下,添加一个节点有步骤如下:

  1. 创建一个新节点。
  2. 设置它的next指针指向当前的head节点。
  3. 设置head节点指向新节点。

条件竞争分析与解决

单线程下上述步骤没有问题,但当有两个线程同时添加节点时,第2步与第3步之间会产生竞争条件:在线程1执行第2步读取head的值和第3步更新head的值之间,线程2可能会修改head的值——这会导致其他线程的修改被丢弃,或造成更加严重的后果。在解决这个竞争条件之前,还需要注意一点:一旦head更新并指向了新节点,另一个线程就能读到这个节点。因此,在head设置为指向新节点前,新节点必须准备就绪——此后将无法修改此节点。

如何解决该条件竞争?答案是在第3步时使用原子的比较/交换操作,来确保步骤2读取到head以来,不会对head进行修改;若有修改,则循环后重试。有push代码实例如下。

代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename T>
class lock_free_stack {
private:
struct node {
T data;
node *next;
node(T const &data_) : data(data_) {}
};
private:
std::atomic<node *> head;
public:
void push(T const &data) {
node *const new_node = new node(data);
new_node->next = head.load();
while (!head.compare_exchange_weak(new_node->next, new_node));
}
};

实例分析

在本实例中,使用compare_exchange_weak以规避条件竞争:

  • 若被存储到new_node->nexthead指针和之前load到的一样,head将被设置为new_node
  • 若返回false,意味着比较失败(head已被另一个线程修改),此时参数1(new_node->next)将更新为head的最新值,并试图再次执行该循环。

由于本次失败后将直接进行循环,因此我们使用比较/交换的weak版本而非strong版本,在某些架构上,前者效率优于后者(详见C++内存模型与原子类型操作——原子操作与原子类型一节)。

pop

pop的步骤在单线程下并不复杂:

  1. 读取head的当前值保存到node。
  2. 读取head->next。
  3. 设置head为head->next。
  4. 返回node中的数据。
  5. 删除node节点。

条件竞争分析与解决

但在多线程场景下,两个线程可能在步骤1中读取到相同的head值。存在一种场景:线程1处理到步骤5时,线程2还在处理步骤2,这将导致线程2试图解引用空悬指针——为了规避此问题,只能跳过步骤5,泄漏该节点,但问题并没有解决——这两个线程将返回同一个节点,这不符合栈语义。
解决问题的思路类似于push操作:在步骤3处使用比较/交换操作更新head。当比较/交换操作失败时,不是一个新节点已被推入,就是另一个线程弹出了节点。无论是哪种情况,都得返回步骤1(比较/交换操作会重新读取head)。一旦比较/交换成功,则当前线程是从栈上弹出指定节点的唯一线程,此后即可执行步骤4。

代码实例1

1
2
3
4
5
6
7
8
9
template<typename T> 
class lock_free_stack {
public:
void pop(T& result) {
node* old_head = head.load();
while(!head.compare_exchange_weak(old_head,old_head->next));
result=old_head->data;
}
};

实例1分析

实例1虽然仅有3行代码,但是问题却不少,以下将一一分析。
首先,这段代码在链表为空时将触发UB——当head是空指针时,尝试访问其next指针的行为未定义。解决方案很简单:在while循环内判空,并在空栈上抛出一个异常,或者pop函数返回一个bool 值来表明成功与否。
其次,这段代码存在异常安全问题。在前文中读者已经了解到拷贝对象作为返回值可能存在问题——若在拷贝返回值时如果抛出异常,该值将丢失(栈内没有,返回值也没有)。传入引用能解决该问题——至少异常发生时栈没有发生变更。但pop操作需要从栈上移除元素,因此通过引用获取返回值的方式并不可取——开发者依旧需要按值返回,那只能使用智能指针。智能指针意味着需要在堆上分配内存,但分配内存同样可能造成异常(此时又造成了数据丢失或pop失败),因此开发者可以在push内分配内存。

代码实例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<typename T>
class lock_free_stack {
private:
struct node {
std::shared_ptr<T> data;
node *next;

node(T const &data_) : data(std::make_shared<T>(data_)) {}
};

std::atomic<node *> head;
public:
void push(T const &data) {
node *const new_node = new node(data);
new_node->next = head.load();
while (!head.compare_exchange_weak(new_node->next, new_node));
}

std::shared_ptr<T> pop() {
node *old_head = head.load();
while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
return old_head ? old_head->data : std::shared_ptr<T>();
}
};

实例2分析

需要注意一点,尽管该栈结构是无锁的,但并非无等待。因为只要compare_exchange_weak保持失败,push和pop函数中的while循环理论上可以无限循环下去。


在无锁数据结构中管理内存

 
在上一小节中,为了避免当线程A删除某节点的同时,线程B还持有指向该节点的指针并需要解引用导致竞争条件,我们选择了泄漏节点。对于C++开发者而言,内存泄漏是不可接受的,本小节将致力于解决该问题。

该问题的核心在于如何保证释放节点时没有其他线程持有该节点。可能访问指定节点的线程只有2种:

  1. 把节点添加到栈中的线程
  2. 调用pop的线程

所有节点均在push内创建,一旦节点入栈,push将不再触碰该节点,此时仅剩下调用pop的线程——如果只有一个这样的线程,那么删除该节点是安全的。

当然,并发场景下往往是同一个栈实例上存在多个线程调用pop,此时我们需要为节点写一个专用GC,虽然听起来很麻烦,但实际上也还算凑活:只需要检查哪些被pop访问到的节点即可。该GC的任务如下:将所有待删除节点推入队列,若当前无任何进程执行pop,则安全清除队列内所有节点。GC内部持有一个原子变量,用以统计当前进入pop的线程。根据以上需求,有pop修订版如下。

pop实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template<typename T>
class lock_free_stack {
private:
std::atomic<unsigned> threads_in_pop;
void try_reclaim(node *old_head);

public:
std::shared_ptr<T> pop() {
++threads_in_pop; // 增加计数
node *old_head = head.load();
while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
std::shared_ptr<T> res;
if (old_head) {
res.swap(old_head->data); // 交换而非拷贝
}
try_reclaim(old_head); // 尝试回收删除的节点,递减计数
return res;
}
};

需要关注的点在于:数据的传出使用了swap而非单纯拷贝,原因很简单——旧节点已经无需再持有数据,如果拷贝的话,数据的引用计数+1,从而致使即使pop调用方已经完成数据使用,数据仍未能立即释放(压入待删除队列的节点依然持有数据)—— 本质上是分离了节点与数据的生命周期。

try_reclaim实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
template<typename T>
class lock_free_stack {
private:
std::atomic<node *> to_be_deleted;

static void delete_nodes(node *nodes) {
while (nodes) {
node *next = nodes->next;
delete nodes;
nodes = next;
}
}

void try_reclaim(node *old_head) {
if (threads_in_pop == 1) {
node *nodes_to_delete = to_be_deleted.exchange(nullptr);
if (0 == --threads_in_pop) { // 3
delete_nodes(nodes_to_delete);
} else if (nodes_to_delete) {
chain_pending_nodes(nodes_to_delete); // 2
}
delete old_head; // 1
} else {
chain_pending_node(old_head);
--threads_in_pop;
}
}

void chain_pending_nodes(node *nodes) {
// 获取链表尾部
node *last = nodes;
while (node * const next = last->next) {
last = next;
}
chain_pending_nodes(nodes, last);
}

// 更新待删除列表
void chain_pending_nodes(node *first, node *last) {
last->next = to_be_deleted;
while (!to_be_deleted.compare_exchange_weak(last->next, first));
}

void chain_pending_node(node *n) {
chain_pending_nodes(n, n);
}
};

实例分析

若当前仅有一个线程在pop内,则此时可安全删除待移出节点(步骤1)。若当前存在多个线程在pop内,删除任何节点均不安全,因此将当前节点添加至未决(pending)链表。

若当前仅有一个线程在pop内,需要尝试回收所有的未决节点(如前文所说,当前节点必然被回收)。首先通过原子exchange操作获取未决链表,并递减计数,若此时计数为0,则意味着可安全删除链表内所有内容(虽然删除时可能会出现新的未决节点)。若计数递减后不为0(此时存在其他线程在获取threads_in_pop和获取链表之间调用pop),则需要将未决节点加入未决链表内(步骤2),具体情形可见下图。

图示

如图,线程C添加节点Y到to_be_deleted链表,而此时线程B仍在通过old_head引用它,并且尝试读其next指针。因此线程A必须再次效验threads_in_pop,否则将误删节点。
sth
image.png-589.8kB

方案优劣

在低负载场景下,这一方案运行良好,因为总有时间点没有线程运行pop。但这些时间点稍纵即逝,这直接导致了:

  1. 回收前,需要效验threads_in_pop计数为0
  2. 在delete(步骤1)前执行效验

删除节点是一个耗时的操作,因此线程修改链表的时间窗口越小越好。从第一次发现threads_in_pop == 1到尝试删除节点耗费的时间越长,就越有可能有另一个线程会调用pop,导致此线程看见的threads_in_pop不再等于1,从而无法直接删除节点。

在高负载场景下,可能永远不会存在能回收节点的时间点,此时链表将会无限增长,导致再次泄漏内存。


使用风险指针检测不可回收的节点

 
“风险指针”这个术语引自Maged Michael发现的一项技术。其基本思想为,如果线程A准备访问线程B想要删除的对象,A将设置风险指针来引用此对象,然后通知其他线程使用这个指针是危险的。一旦不再需要此对象,则可清除风险指针。以划船比赛机制举例——比赛开始(删除对象)前,每个船上的舵手可以举手示意他们还没有准备好。只要有舵手举手,裁判就不能开始比赛。当所有舵手把手放下后,比赛才能开始,舵手可以在比赛开始前随意举手,无次数限制。

pop实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::shared_ptr <T> pop() {
std::atomic<void *> &hp = get_hazard_pointer_for_current_thread();
node *old_head = head.load();
do {
node *temp;
do
{
temp = old_head;
hp.store(old_head);
old_head = head.load();
} while (old_head != temp); // 一直循环,直到将风险指针设为head
} while (old_head && !head.compare_exchange_strong(old_head, old_head->next)); // 1 重新设置head
hp.store(nullptr); // 取出头结点后,清除风险指针
std::shared_ptr <T> res;
if (old_head) {
res.swap(old_head->data);
// 在删除之前检查是否有风险指针引用该节点(因为在上面那个内部循环中可能多个线程都把某个head设置为自己线程的风险指针)
if (outstanding_hazard_pointers_for(old_head))
{
reclaim_later(old_head); // 加入链表内,稍后删除
} else {
delete old_head;
}
delete_nodes_with_no_hazards();
}
return res;
}

实例分析

首先,设置风险指针的循环置于外部循环1内,该外部循环会在“比较/交换”操作失败的时候重新加载old_head,需要注意这里使用了compare_exchange_strong,因为在while循环内部有事可做(若使用compare_exchange_weak伪失败后,会导致不必要地重新设置风险指针)。若已成功取出节点,此时即可清除风险指针,并检查当前节点是否为其他线程的风险指针。若是,则置于链表内,留待后续回收;否则立刻删除节点。最后delete_nodes_with_no_hazards将检测所有由reclaim_later收集的节点,若当前没有任何风险指针引用这些节点,则可安全删除这些节点,否则留待下一个调用pop的线程继续检测。

get_hazard_pointer_for_current_thread的简单实现

get_hazard_pointer_for_current_thread的具体实现方案对程序逻辑影响不大。目前可以先设计一个简单(但不高效)的结构:一个定长数组,其元素类型是一个KV对——K是线程ID,V为风险指针。通过搜索这个数组来找到空闲槽位,并设置KV对。当线程退出时,重置ID为默认值,表征该槽位已空闲。具体实例如下。

get_hazard_pointer_for_current_thread实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
unsigned const max_hazard_pointers = 100;
struct hazard_pointer {
std::atomic <std::thread::id> id{};
std::atomic<void *> pointer{};
};
hazard_pointer hazard_pointers[max_hazard_pointers];

class hp_owner {
hazard_pointer *hp;
public:
hp_owner(hp_owner const &) = delete;
hp_owner operator=(hp_owner const &) = delete;

hp_owner() :
hp(nullptr) {
for (auto & hazard_pointer : hazard_pointers) {
std::thread::id old_id;
// 找到第一个空闲槽位
if (hazard_pointer.id.compare_exchange_strong(old_id, std::this_thread::get_id())) {
hp = &hazard_pointer;
break;
}
}
if (!hp) {
throw std::runtime_error("No hazard pointers available");
}
}

std::atomic<void *> &get_pointer() {
return hp->pointer;
}

~hp_owner() {
hp->pointer.store(nullptr);
hp->id.store(std::thread::id());
}
};

std::atomic<void *> &get_hazard_pointer_for_current_thread() {
thread_local static hp_owner hazard;
return hazard.get_pointer();
}

// 判断当前指针是否为风险指针
bool outstanding_hazard_pointers_for(void *p) {
for (unsigned i = 0; i < max_hazard_pointers; ++i) {
if (hazard_pointers[i].pointer.load() == p) {
return true;
}
}
return false;
}

实例分析

get_hazard_pointer_for_current_thread的实现如下:它有一个hp_owner类型的thread_local变量,用来存储当前线程的风险指针。每个线程第一次调用这个函数时,一个新的hp_owner实例被创建,该实例将通过compare_exchange_strong获取空闲槽位,停止搜索,若遍历完毕尚未找到空闲槽位,则抛出异常。显然,这种遍历对工作线程只需要一次,hp_owner承载了缓存的作用。当线程退出时其专属的hp_owner实例将被销毁,数组内将新增一个空闲槽位。

回收实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template<typename T>
void do_delete(void *p) {
delete static_cast<T *>(p);
}

struct data_to_reclaim {
void *data;
std::function<void(void *)> deleter;
data_to_reclaim *next;

template<typename T>
data_to_reclaim(T *p): data(p), deleter(&do_delete<T>), next(nullptr) {}

~data_to_reclaim() { deleter(data); }
};

std::atomic<data_to_reclaim *> nodes_to_reclaim; // 回收链表

void add_to_reclaim_list(data_to_reclaim *node) {
node->next = nodes_to_reclaim.load();
while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));
}

template<typename T>
void reclaim_later(T *data) {
add_to_reclaim_list(new data_to_reclaim(data));
}

void delete_nodes_with_no_hazards() {
data_to_reclaim *current = nodes_to_reclaim.exchange(nullptr);
while (current) {
data_to_reclaim *const next = current->next;
if (!outstanding_hazard_pointers_for(current->data)) {
delete current;
} else {
add_to_reclaim_list(current);
}
current = next;
}
}

实例分析

首先,以data_to_reclaim作为链表内的元素类型,其构造函数是一个函数模板(主要是为了支持各类型的deleter)。reclaim_later的职责很简单——将该指针构建为data_to_reclaim类型后添加至链表内。

delete_nodes_with_no_hazards使用exchange函数获取需要回收的整个链表。这个简单但很关键的步骤确保了只有一个线程回收该链表。其他线程现在自由地将其他节点添加到新回收链表中,甚至尝试对节点进行回收,而不影响本回收线程。当获取到链表头后,遍历该链表内节点,若该节点为非风险指针,则可安全删除,否则将该节点重新置于回收链表内(已不再是本链表)。

虽然这个简单的实现确实安全地回收了被删除的节点,不过开销极大。每次pop调用都需要扫描一次风险指针数组,这意味着检查max_hazard_pointers个原子变量。原子操作本来就慢(在台式CPU上,原子操作比非原子操作慢100倍)。每一次调用outstanding_hazard_pointers_for时开发者需要意识到:可能会有max_hazard_pointers个节点在链表中,并且它们需要和max_hazard_pointers个存储的风险指针做比较。是时候做性能优化了。

优化思路

上文只是描述了一个简单的风险指针实现,用以辅助解释该技术。以下将简单介绍一些优化点。
首先可以用内存换性能。不同于每次调用pop都检查回收链表上的每个节点,如果一直等到2×max_hazard_pointers个节点在链表中时,由于最多有max_hazard_pointers个节点是活跃的,即可保证至少有max_hazard_pointers个节点可以被回收,并且再次尝试回收任意节点前,至少会对pop有 max_hazard_pointer次调用。比起每次pop调用检查大约max_hazard_pointers个节点(不一定能回收到节点),每max_hazard_pointers次pop调用,检查2×max_hazard_pointers个节点,就会有max_hazard_pointers个节点可回收。这等价于pop调用检查两个节点,其中有一个被回收。
不过,这个方案不仅使得回收链表增加了内存使用,同时还增加了计数需求,这意味着需要使用原子计数器,并且有多线程争竞争访问回收链表本身。当然,如果内存充裕,可以考虑令每个线程通过线程局部变量,持有自己的回收链表,如果一个线程在它的所有节点被回收前退出,它的本地链表可以像之前一样存储到全局链表中,然后添加到下一个线程的本地回收链表内,令该线程执行回收操作。


使用引用计数检测节点是否在使用

 
区别于风险指针通过把使用中的节点存储到链表中,引用计数通过统计每个节点上访问的线程数量来判断当前节点是否正在被使用。

使用std::shared_ptr

理论上我们可以直接使用std::shared_ptr<>完成引用计数,但需要注意一点,在部分平台上,std::shared_ptr<>并不保证无锁,尽管其部分操作能够保证原子性。毕竟标准库中的std::shared_ptr<>旨在广泛用于多种上下文内,令其原子操作无锁可能会带来额外的开销。

若我们运气足够好,本平台std::atomic_is_lock_free(&some_shared_ ptr)返回true,那实现就简单多了,具体实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template<typename T>
class lock_free_stack {
private:
struct node {
std::shared_ptr<T> data;
std::shared_ptr<node> next;

node(T const &data_) :
data(std::make_shared<T>(data_)) {}
};

std::shared_ptr<node> head;
public:
void push(T const &data) {
std::shared_ptr<node> const new_node = std::make_shared<node>(data);
new_node->next = std::atomic_load(&head);
while (!std::atomic_compare_exchange_weak(&head,
&new_node->next, new_node));
}

std::shared_ptr<T> pop() {
std::shared_ptr<node> old_head = std::atomic_load(&head);
while (old_head && !std::atomic_compare_exchange_weak(&head,
&old_head, old_head->next));
if (old_head) {
std::atomic_store(&old_head->next, std::shared_ptr<node>()); // 避免当最后一个std::shared_ptr引用的给定节点被销毁时,删除后续数据,因此需要清空next指针
return old_head->data;
}
return std::shared_ptr<T>();
}

~lock_free_stack() {
while (pop());
}
};

手动管理引用计数

一种技术是对每个节点使用两个而不是一个引用计数:一个内部计数和一个外部计数,两个值的和就是对这个节点的引用总数。
外部计数与指向节点的指针一起保存,并且每次读取指针的时候外部计数增加。当读线程使用完节点后,递减内部计数。一个读指针操作完成后,外部计数将加1,内部计数会减1。
当不再需要外部计数&&指针时(该节点不再位于可以被多个线程访问的位置),内部计数将加上外部计数-1的值,并丢弃外部计数。若此时内部计数等于0,则认为没有对该节点的引用,可以将该节点安全的删除。

代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
template<typename T>
class lock_free_stack {
private:
struct node;
struct counted_node_ptr {
int external_count;
node *ptr;
};

struct node {
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;

node(T const &data_) : data(std::make_shared<T>(data_)), internal_count(0) {}
};

std::atomic<counted_node_ptr> head;
public:
~lock_free_stack() {
while (pop());
}

void push(T const &data) {
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load();
while (!head.compare_exchange_weak(new_node.ptr->next, new_node));
}
private:
void increase_head_count(counted_node_ptr &old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
} while (!head.compare_exchange_strong(old_counter, new_counter));
old_counter.external_count = new_counter.external_count;
}

public:
std::shared_ptr<T> pop() {
counted_node_ptr old_head = head.load();
for (;;) {
increase_head_count(old_head);
node *const ptr = old_head.ptr;
if (!ptr) {
// 理论上需要减少该节点的外部计数
return std::shared_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next)) {
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase = old_head.external_count - 2;
if (ptr->internal_count.fetch_add(count_increase) == -count_increase) { // 引用计数和为0
delete ptr;
}
return res;
}
// 该节点已不再是head节点,需要重新读取,此时令该节点内部计数-1
if (ptr->internal_count.fetch_sub(1) == 1) { // 节点已被删除
delete ptr;
}
}
}
};

实例解析

首先,外部计数和节点指针一起包装在counted_node_ptr结构中,并作为node结构的next指针,同时node内持有内部计数。因为counted_node_ptr是一个简单的结构体,因此可以和std::atomic<>模板一起用作链表的head。在支持“双字比较和交换”(double-word-compare-and-swap)操作的平台上,由于这个结构体足够小,因此std::atomic支持无锁。若当前平台不支持的话,则只能使用std::shared_ptr作为引用计数工具。当类型的体积对平台的原子指令来讲太大的话,std::atomic<>将使用互斥锁来保证其操作的原子性(使“无锁”算法退化为“基于锁”的算法)。又或者,限制计数器大小,将其填充至指针内空余的bit位(比如,地址空间只有48位,而一个指针占64位。例如intel平台),这样就可以塞进一个机器字当中。

push很简单——构造了一个counted_node_ptr实例,引用新分配出来带有相关数据的node,并将node的next指针设置为当前head。此时internal_count为0,external_count为1(这是一个新节点,当前只有一个外部引用指向它,也就是head指针本身)。

pop操作相对复杂一些。一旦加载了head的值,首先必须增加对head节点的外部引用计数,以表明正在引用这个节点并且确保解引用是安全的。如果在引用计数增加前解引用指针,另一个线程可能在你访问这个节点之前释放它,从而使你持有一个空悬指针。这就是使用内外引用计数的主要原因:通过增加外部引用计数,保证了指针在访问期间是有效的。递增操作通过compare_exchange_strong循环完成,该循环保证了指针不会在同一时间内被另一个线程修改。
一旦计数增加,则可安全地解引用head值的ptr字段,以便访问指向的节点。如果指针为空,说明此时链表为空,否则尝试对head调用compare_exchange_strong以移除该节点。

  1. 若compare_exchange_strong成功,则意味着此时该线程掌握了节点所有权,可置换出data数据。此后使用原子操作fetch_add将外部计数加到内部计数。若此时引用计数为0,那么之前的值(fetch_add返回值)则为增加值的负数,此时可安全删除节点。需要注意的是,增加的值要比外部引用计数少2——节点已经从链表中删除,需要将外部计数-1,并且由于不再从这个线程访问节点,因此内部计数-1。无论是否删除节点,操作都已经完成,因此直接返回数据。
  2. 若compare_exchange_strong失败,则说明另一个线程已移除了该节点,或者另一个线程添加了一个新的节点到栈中。无论是何种情况,都需要用compare_exchange_strong返回的新的head值再次启动操作。不过,首先需要递减尝试移除的节点上的引用计数,因为该线程不会再访问此节点。如果当前线程是最后一个持有引用(另一个线程已经将这个节点从栈上移除了)的线程,此时内部引用为1,减1后为0,此时判定节点可安全删除。

应用内存模型顺序至无锁栈

 
目前为止,本文实例一直使用默认的std::memory_order_seq_cst内存顺序。在大多数系统上,std::memory_order_seq_cst在执行时间和同步开销方面比其他内存顺序更为昂贵,也许是时候做进一步优化了~

在对内存顺序做优化之前,首先需要明确数据结构的使用场景,
在修改内存顺序之前,定提供所需关系的最小内存顺序。为了做到这一点,需要在几种不同的场景中从线程的视 角查看相关情况。最简单的场景是一个线程将数据项推入栈,然后另一个线程在一段时间 过后弹出数据项,因此我们将从这开始。