设计基于锁的并发数据结构——更复杂的数据结构

前言

 
上一节所提及的栈和队列均是非常简单的数据结构,使用场景有限。本节将针对【查找表】,【链表】两种使用场景较丰富的数据结构,提供设计实例。


查找表

设计前提与使用场景

std::map是STL提供的现成查找表,其接口重度基于迭代器。从并发视角看,迭代器的线程安全性很难(并非不可能)保证,比如另一个线程可能正在删除本线程迭代器所引用的元素,此时很难验证迭代器是否已经失效。因此本次数据结构接口设计并不基于STL传统思想。

不同于栈和队列,查找表的主要使用场景是查找指定元素,而非修改表内数据(例如前文提及的DNS缓存)。本次设计的查找表具备几个基本操作:

  • 添加一个新的键/值对
  • 修改给定键对应的值
  • 删除一个键及其关联的值
  • 通过给定键,获取对应的值(如有)

除此以外还可以考虑增加一些全容器范围的操作,例如检查容器是否为空,提供当前完整键的快照,提供当前完整键值对的快照等。

设计思路

条件竞争

查找表接口的固有条件竞争在于:若两个线程同时试图添加key相同的KV对,则只有一个线程能够添加成功,因此必须修改接口,合并添加修改为同一个成员函数。

为了提高并发性,可以使用std::shared_mutex,保证支持多线程读取,单线程写入。

底层数据结构

实现查找表的底层数据结构一般有三种;

  1. 二叉树(AVL或红黑树)
  2. 有序数组
  3. 哈希表

二叉树并没有为扩展并发提供太多可能,每一个查找或者修改操作都需要从访问根节点开始,因此,根节点需要上锁。虽然随着访问线程沿着树向下移动,这个锁可以释放,但相比横跨整个数据结构的单一锁,提升并不大。有序数组则更不适合,每一次修改或查找都必须锁住整个数组,毫无并发性可言。

对于哈希表而言,一个键属于哪个桶纯粹由键的属性以及哈希函数决定,每个桶都可以有一个独立的锁。若桶的数量为N,则并发访问的可能性约为数组的N倍(虽然需要挑选一个好的哈希函数)。

代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
template<typename Key, typename Value, typename Hash=std::hash<Key> >
class threadsafe_lookup_table {
private: // member class
class bucket_type {
private:
using bucket_value = std::pair<Key, Value>;
using bucket_data = std::list<bucket_value>;
using bucket_iterator = typename bucket_data::iterator;
using bucket_const_iterator = typename bucket_data::const_iterator;
private:
bucket_data data;
mutable std::shared_mutex mutex; // C++17
private:
bucket_const_iterator find_entry_for(Key const &key) const {
return std::find_if(data.begin(), data.end(), [&](bucket_value const &item) { return item.first == key; });
}

bucket_iterator find_entry_for(Key const &key) {
return std::find_if(data.begin(), data.end(), [&](bucket_value const &item) { return item.first == key; });
}

public:
Value value_for(Key const &key, Value const &default_value) const {
std::shared_lock<std::shared_mutex> lock(mutex);
bucket_const_iterator const found_entry = find_entry_for(key);
return (found_entry == data.end()) ? default_value : found_entry->second;
}

void add_or_update_mapping(Key const &key, Value const &value) {
std::unique_lock<std::shared_mutex> lock(mutex);
bucket_iterator found_entry = find_entry_for(key);
if (found_entry == data.end()) {
data.push_back(bucket_value(key, value));
} else {
found_entry->second = value;
}
}

void remove_mapping(Key const &key) {
std::unique_lock<std::shared_mutex> lock(mutex);
bucket_iterator found_entry = find_entry_for(key);
if (found_entry != data.end()) {
data.erase(found_entry);
}
}
};

private:
std::vector<std::unique_ptr<bucket_type>> buckets;
Hash hasher;

private:
bucket_type &get_bucket(Key const &key) const {
std::size_t const bucket_index = hasher(key) % buckets.size();
return *buckets[bucket_index];
}

public:
explicit threadsafe_lookup_table(unsigned num_buckets = 19, Hash const &hasher_ = Hash()) :
buckets(num_buckets), hasher(hasher_) {
for (unsigned i = 0; i < num_buckets; ++i) {
buckets[i].reset(new bucket_type);
}
}

threadsafe_lookup_table(threadsafe_lookup_table const &other) = delete;

threadsafe_lookup_table &operator=(threadsafe_lookup_table const &other) = delete;

Value value_for(Key const &key, Value const &default_value = Value()) const {
return get_bucket(key).value_for(key, default_value);
}

void add_or_update_mapping(Key const &key, Value const &value) {
get_bucket(key).add_or_update_mapping(key, value);
}

void remove_mapping(Key const &key) {
get_bucket(key).remove_mapping(key);
}

// 获取快照
std::map<Key, Value> get_map() const {
std::vector<std::unique_lock<std::shared_mutex>> locks;
// 以相同顺序加锁,确保不会死锁
for (unsigned i = 0; i < buckets.size(); ++i) {
locks.push_back(std::unique_lock<std::shared_mutex>(buckets[i]->mutex));
}
std::map<Key, Value> res;
for (unsigned i = 0; i < buckets.size(); ++i) {
for (auto it = buckets[i]->data.cbegin(); it != buckets[i]->data.cend(); ++it) {
res.insert(*it);
}
}
return res;
}
};

链表

设计前提与使用场景

如前文所述,本次接口设计依然避免基于迭代器(STL风格迭代器生命周期不受容器控制,很难确保线程安全性)。

链表的常用场景有以下几点:

  • 添加一个项到链表
  • 从链表中删除满足某个条件的项
  • 从链表中查找满足某个条件的项
  • 更新链表中满足某个条件的项
  • 拷贝链表中的每一项到另一个容器中

基本设计思想

令链表的每一个节点持有一个互斥锁,即可最大程度实现并发性(尽管互斥锁个数将随着链表长度增长而增长),每个操作仅需要锁住它需要的节点,并在关注点转向其他节点时释放该锁。

代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
template<typename T>
class threadsafe_list {
private: // member class
struct node {
std::mutex m;
std::shared_ptr <T> data;
std::unique_ptr <node> next;

node() = default;

explicit node(T const &value) : data(std::make_shared<T>(value)) {}
};
private:
node head; // 哨兵节点
public:
threadsafe_list() = default;

~threadsafe_list() { remove_if([](node const &) { return true; }); }

threadsafe_list(threadsafe_list const &other) = delete;

threadsafe_list &operator=(threadsafe_list const &other) = delete;

public:
void push_front(T const &value) {
std::unique_ptr <node> new_node(new node(value));
std::lock_guard <std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
}

template<typename Function>
void for_each(Function f) {
node *current = &head;
std::unique_lock <std::mutex> lk(head.m);
while (node *const next = current->next.get()) {
std::unique_lock <std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}

template<typename Predicate>
std::shared_ptr <T> find_first_if(Predicate p) {
node *current = &head;
std::unique_lock <std::mutex> lk(head.m);
while (node *const next = current->next.get()) {
std::unique_lock <std::mutex> next_lk(next->m);
lk.unlock();
if (p(*next->data)) {
return next->data;
}
current = next;
lk = std::move(next_lk);
}
return std::shared_ptr<T>();
}

template<typename Predicate>
void remove_if(Predicate p) {
node *current = &head;
std::unique_lock <std::mutex> lk(head.m);
while (node *const next = current->next.get()) {
std::unique_lock <std::mutex> next_lk(next->m);
if (p(*next->data)) {
std::unique_ptr <node> old_next = std::move(current->next);
current->next = std::move(next->next);
next_lk.unlock(); // 若无此语句,则将导致销毁已上锁的互斥锁——未定义行为
} else {
lk.unlock();
current = next;
lk = std::move(next_lk);
}
}
}
};

实例分析

push_front

首先在锁外构建新节点,并默认初始化其next节点为nullptr。此后,在锁内获取当前头部节点,完成节点链接。由于只需要锁住一个互斥锁,因此死锁发生率为0。性能开销较大的内存分配位于锁外,无性能问题。

for_each

与大多数标准库算法类似,for_each将对Function对象执行值传递(确保无论是函数还是函数对象都能正常工作)。需要关注的点在于交叉(hand-over-hand)上锁。首先,锁住cur节点的互斥锁以安全访问其next节点。若该节点不为nullptr,则锁住该节点互斥锁,同时释放cur节点互斥锁,此后待函数执行完毕后,更新cur节点,并重新上锁cur。

find_first_if

与for_each类似,区别在于一旦符合谓词则直接返回。

remove_if

交叉上锁的步骤与for_each类似。
若谓词返回true,此时更新cur->next断开链接,释放next对应的互斥锁,std::unique_ptr将负责删除节点,cur无需更新(仍需检查后续节点)。
若谓词返回false,正常迭代。