前言
在线程运行过程中经常发生如下场景:线程A需要等待线程B完成某操作。有三种方案可以解决这一需求:
反复检查共享标志位(例如mutex),直到某线程完成操作后将其置位。
这将造成两点浪费:首先,线程不得不耗费宝贵的运行时间循环检查数据;其次,如果mutex被线程A上锁(它需要锁住互斥量以查看被其保护的标志位),那么其他线程将无法获得该锁,甚至连已完成任务的线程B也无法锁住互斥量以重置该标志位。使用
std::this_thread::sleep_for()
令线程在等待间隙短暂休眠1
2
3
4
5
6
7
8
9
10
11bool flag;
std::mutex m;
void wait_for_flag() {
std::unique_lock<std::mutex> lk(m);
while(!flag) {
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠100ms
//....此时可以保证其他线程获取该锁,重置flag
lk.lock();
}
}方案2优于方案1,但缺憾在于开发者无法确定休眠时间取多少合适。过少的休眠时间等于没有,过多的休眠时间又会造成任务必须等待线程苏醒。尽管休眠时间过长很少会对程序运行产生直接影响,但在对实时性较高的场景中(例如游戏画面绘制),这意味着一场灾难。
使用C++标准库提供的工具
最基本的处理等待事件机制为条件变量(condition variable)。从本质而言,条件变量的存在与事件或条件相关联,并且必须存在一些等待该变量被满足的线程。若当前执行线程确定条件已被满足,那么它将通知一个或多个等待该条件变量被满足的线程,以便唤醒它们接着执行任务。
等待条件达成
C++标准库提供了了两种条件变量的实现:std::condition_variable
与std::condition_variable_any
,它们都位于头文件<condition_variable>
内。
二者均需要与互斥量配合,不同之处在于std::condition_variable
仅限于与std::mutex
搭配使用,而std::condition_variable_any
则可以与任何满足最低标准的互斥量一起工作,这也是_any
后缀的由来。当然,灵活性会带来性能上的损耗,因此除非必要,我们应当尽可能使用std::condition_variable
。以下将展示如何使用std::condition_variable
实现条件唤醒:
1 | std::mutex mut; |
首先存在一个用于在两个线程间传递数据的队列。当数据准备就绪时,准备数据的线程调用std ::lock_guard
锁定保护队列的互斥锁,并将数据推送到队列中。接着该线程调用std:: condition_variable
的成员函数notify_one()
来通知可能存在的等待线程。
接下来我们将目光移至data_processing_thread
。该线程首先执行对互斥量的锁定(使用的是unique_lock而非lock_gurad,具体原因将在下文提及)。接着线程调用condition_variable
的成员函数wait
。该函数接收一个lock对象与一个lambda表达式,当lambda表达式返回false时,wait
将解锁互斥量,并将当前线程置于阻塞或等待状态。当其他线程调用notify_one()
通知条件变量时,执行线程从睡眠中苏醒,重新获取互斥锁并检查执行条件(lambda表达式)。若当前条件满足,从wait()
返回(互斥量保持锁定)。若不满足,则线程将解锁互斥量并继续等待。这也就是我们使用unique_lock
的原因——我们需要随时加锁解锁的灵活性。如果线程处于休眠状态时互斥锁保持锁定,则数据准备线程将无法锁定互斥锁并把数据添加到队列中,此时等待线程的条件也将永远不会被满足。
unique_lock
的存在不仅为了满足wait
内频繁加锁与解锁的需要,若当前存在待处理数据,但此时处理操作尚未开始(如代码中时间点1),开发者同样需要调用unlock
,原因在于处理数据可能非常耗时。在前文中我们曾经提及,持有锁的时间过长将造成种种负面影响。
使用条件变量构建线程安全队列
使用队列在线程之间传输数据是一种常见行为,既然如此,我们完全可以在队列内部实现同步,从而降低触发条件竞争的可能性。因此,本节将致力于使用条件变量构建通用型线程安全队列。
首先观察一下C++标准库中queue
的对外接口:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26template <class T, class Container = std::deque<T> >
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc>
explicit queue(const Alloc&);
template <class Alloc>
queue(const Container&, const Alloc&);
template <class Alloc>
queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template <class... Args>
void emplace(Args&&... args);
};
忽略构造、赋值、交换后,值得注意的接口有以下三种:
- 查询接口(
empty()
,size()
) - 数据接口(
front()
,back()
) - 修改接口(
push()
,pop()
,emplace()
)
类似于stack,queue的接口同样存在固有条件竞争,因此需要将front()
与pop()
合并至一个接口内。在前文queue的使用场景中,我们注意到接收线程常常需要等待数据,因此我们将pop()
接口改为两种不同的操作方式:try_pop()
与wait_and_pop()
。前者尝试从队列中弹出值,并总是立即返回(带有isSuccess
标志);后者则持续等待至目标弹出。综上,threadsafe_queue
接口如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename T>
class threadsafe_queue {
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(const threadsafe_queue&) = delete; // delete assign
void push(T new_value);
bool try_pop(T& value); // 传入引用作为输出
std::shared_ptr<T> try_pop(); // 若失败则返回nullptr
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};
具体实现
1 |
|
其中,wait_and_pop
的使用方法如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17threadsafe_queue<data_chunk> data_queue;
void data_preparation_thread() {
while(more_data_to_prepare()) {
data_chunk const data=prepare_data();
data_queue.push(data);
}
}
void data_processing_thread() {
while(true) {
data_chunk data;
data_queue.wait_and_pop(data);
process(data);
if(is_last_chunk(data)) break;
}
}
notify_one的不足与notify_any
notify_one()
的不足在于,它将触发当前正在执行wait()
的一个线程检查其状态并从wait()
返回,但开发者无法精准地通知到某个指定进程,即使该进程正处于等待状态。
若存在几个线程正在等待同一事件,并且它们都需要对该事件作出响应(应用场景如共享数据初始化或多个线程等待共享数据更新)。此时我们可以调用notify_all()
这一成员函数,顾名思义,这将导致当前所有正在执行wait()
的线程检查它们正在等待的条件。