同步并发操作——等待事件或条件

前言

 
在线程运行过程中经常发生如下场景:线程A需要等待线程B完成某操作。有三种方案可以解决这一需求:

  1. 反复检查共享标志位(例如mutex),直到某线程完成操作后将其置位。
    这将造成两点浪费:首先,线程不得不耗费宝贵的运行时间循环检查数据;其次,如果mutex被线程A上锁(它需要锁住互斥量以查看被其保护的标志位),那么其他线程将无法获得该锁,甚至连已完成任务的线程B也无法锁住互斥量以重置该标志位。

  2. 使用std::this_thread::sleep_for()令线程在等待间隙短暂休眠

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    bool flag; 
    std::mutex m;
    void wait_for_flag() {
    std::unique_lock<std::mutex> lk(m);
    while(!flag) {
    lk.unlock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠100ms
    //....此时可以保证其他线程获取该锁,重置flag
    lk.lock();
    }
    }

    方案2优于方案1,但缺憾在于开发者无法确定休眠时间取多少合适。过少的休眠时间等于没有,过多的休眠时间又会造成任务必须等待线程苏醒。尽管休眠时间过长很少会对程序运行产生直接影响,但在对实时性较高的场景中(例如游戏画面绘制),这意味着一场灾难。

  3. 使用C++标准库提供的工具
    最基本的处理等待事件机制为条件变量(condition variable)。从本质而言,条件变量的存在与事件或条件相关联,并且必须存在一些等待该变量被满足的线程。若当前执行线程确定条件已被满足,那么它将通知一个或多个等待该条件变量被满足的线程,以便唤醒它们接着执行任务。

等待条件达成

 
C++标准库提供了了两种条件变量的实现:std::condition_variablestd::condition_variable_any,它们都位于头文件<condition_variable>内。

二者均需要与互斥量配合,不同之处在于std::condition_variable仅限于与std::mutex搭配使用,而std::condition_variable_any则可以与任何满足最低标准的互斥量一起工作,这也是_any后缀的由来。当然,灵活性会带来性能上的损耗,因此除非必要,我们应当尽可能使用std::condition_variable。以下将展示如何使用std::condition_variable实现条件唤醒:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

void data_preparation_thread() {
while(more_data_to_prepare()) {
data_chunk const data = prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
}

void data_processing_thread() {
while(true) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[]{return !data_queue.empty();});
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock();// 时间点1
process(data);
if(is_last_chunk(data)) break;
}
}

首先存在一个用于在两个线程间传递数据的队列。当数据准备就绪时,准备数据的线程调用std ::lock_guard锁定保护队列的互斥锁,并将数据推送到队列中。接着该线程调用std:: condition_variable的成员函数notify_one()来通知可能存在的等待线程。

接下来我们将目光移至data_processing_thread。该线程首先执行对互斥量的锁定(使用的是unique_lock而非lock_gurad,具体原因将在下文提及)。接着线程调用condition_variable的成员函数wait。该函数接收一个lock对象与一个lambda表达式,当lambda表达式返回false时,wait将解锁互斥量,并将当前线程置于阻塞或等待状态。当其他线程调用notify_one()通知条件变量时,执行线程从睡眠中苏醒,重新获取互斥锁并检查执行条件(lambda表达式)。若当前条件满足,从wait()返回(互斥量保持锁定)。若不满足,则线程将解锁互斥量并继续等待。这也就是我们使用unique_lock的原因——我们需要随时加锁解锁的灵活性。如果线程处于休眠状态时互斥锁保持锁定,则数据准备线程将无法锁定互斥锁并把数据添加到队列中,此时等待线程的条件也将永远不会被满足。

unique_lock的存在不仅为了满足wait内频繁加锁与解锁的需要,若当前存在待处理数据,但此时处理操作尚未开始(如代码中时间点1),开发者同样需要调用unlock,原因在于处理数据可能非常耗时。在前文中我们曾经提及,持有锁的时间过长将造成种种负面影响。


使用条件变量构建线程安全队列

 
使用队列在线程之间传输数据是一种常见行为,既然如此,我们完全可以在队列内部实现同步,从而降低触发条件竞争的可能性。因此,本节将致力于使用条件变量构建通用型线程安全队列。

首先观察一下C++标准库中queue的对外接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <class T, class Container = std::deque<T> > 
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc>
explicit queue(const Alloc&);
template <class Alloc>
queue(const Container&, const Alloc&);
template <class Alloc>
queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);

void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template <class... Args>
void emplace(Args&&... args);
};

忽略构造、赋值、交换后,值得注意的接口有以下三种:

  1. 查询接口(empty()size())
  2. 数据接口(front()back())
  3. 修改接口(push()pop()emplace())

类似于stack,queue的接口同样存在固有条件竞争,因此需要将front()pop()合并至一个接口内。在前文queue的使用场景中,我们注意到接收线程常常需要等待数据,因此我们将pop()接口改为两种不同的操作方式:try_pop()wait_and_pop()。前者尝试从队列中弹出值,并总是立即返回(带有isSuccess标志);后者则持续等待至目标弹出。综上,threadsafe_queue接口如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <memory> // for std::shared_ptr
template<typename T>
class threadsafe_queue {
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(const threadsafe_queue&) = delete; // delete assign
void push(T new_value);
bool try_pop(T& value); // 传入引用作为输出
std::shared_ptr<T> try_pop(); // 若失败则返回nullptr
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <queue> 
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}

void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T> (data_queue.front()));
data_queue.pop();
return res;
}

bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T> (data_queue.front()));
data_queue.pop(); return res;
}

bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

其中,wait_and_pop的使用方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
threadsafe_queue<data_chunk> data_queue;

void data_preparation_thread() {
while(more_data_to_prepare()) {
data_chunk const data=prepare_data();
data_queue.push(data);
}
}

void data_processing_thread() {
while(true) {
data_chunk data;
data_queue.wait_and_pop(data);
process(data);
if(is_last_chunk(data)) break;
}
}


notify_one的不足与notify_any

 
notify_one()的不足在于,它将触发当前正在执行wait()一个线程检查其状态并从wait()返回,但开发者无法精准地通知到某个指定进程,即使该进程正处于等待状态。

若存在几个线程正在等待同一事件,并且它们都需要对该事件作出响应(应用场景如共享数据初始化或多个线程等待共享数据更新)。此时我们可以调用notify_all()这一成员函数,顾名思义,这将导致当前所有正在执行wait()的线程检查它们正在等待的条件。