39.对于一次性事件通信考虑采用void future

前言

 
有时我们需要一个task告诉第二个异步运行的task当前已发生特定事件,因为第二个task可能在该事件尚不明确前无法继续。这些特定事件可能是某个数据结构已被初始化,又或者是某项计算已被完成,又或者是某个关键传感器值已被采集得到,那么我们应当采取何种方式以完成这种线程间通信?


条件变量

 
针对上个问题,使用条件变量(condvar)是一种非常显然的方案。我们将task分为两种,检测条件是否符合的detecting task与针对条件作出反应的reacting task,那么实现通信只需要通过一种简单的策略:reacting task等待条件变量的传入,而探测线程在事件触发时传递条件变量:

1
2
std::condition_variable cv; // condvar for event
std::mutex m; // mutex for use with cv

detecting task可以写为:
1
2
// detect event
cv.notify_one(); // tell reacting task

如果需要告知多个reacting task,则将notify_one改为notify_all即可。

reacting task的实现较为繁琐一些,因为在调用在condvar上调用wait之前,它必须通过std::unique_lock一个mutex(这种手法在线程库中相当常见,通过std::unique_lock锁定互斥锁的需求只是C++11 API的一部分)。以下将给出概念上的实现方案:

1
2
3
4
5
6
7
8
// prepare to react
{ // open critical section
std::unique_lock<std::mutex> lk(m); // lock mutex
cv.wait(lk); // wait for notify; this isn't correct!
// react to event(m is locked)
} // close crit. section;
// unlock m via lk's dtor
// continue reacting(m now unlocked)

上述代码存在一些问题,第一个问题在于我们是否需要使用mutex。mutex用于控制对共享数据的访问,但detecting task与reacting task完全有可能并不需要使用它。举例而言,例如,detecting task可能负责初始化全局数据结构以供reacting task使用。如果detecting task在初始化之后再不访问数据结构,并且reacting task在初始化完成前从不试图访问数据结构,那么我们没有任何理由需要使用mutex。

除了mutex的使用必要性之外,上述程序还存在两个问题:

  1. 如果detecting task在reacting task wait之前便已通知,那么reacting task将一直悬挂。
  2. wait语句存在虚假唤醒问题
    绝大多数线程API(并不仅仅只有C++)都存在条件变量尚未通知便已唤醒的情况,这被称为虚假唤醒。为了保证正确性,大多数程序往往在唤醒后第一时间确认条件变量是否通知,因此我们可以用lambda表征检测,然后将其传递给wait:
    1
    cv.wait(lk,[]{ return whether the event has occurred; });
    但这要求reacting task能够判定条件是否触发,而我们本来就是因为reacting task无法分辨而加入了条件变量。

共享布尔量

 
对于大多数开发者来说,他们可能会选择加入共享bool量来完成任务。该布尔量被初始化为false,当detecting task明确事件触发后将其置为true:

1
2
3
td::atomic<bool> flag(false); // shared flag; see Item 40 for std::atomic
// detect event
flag = true; // tell reacting task

如此一来reacting task只需要轮询flag即可明确事件是否发生:
1
2
3
// prepare to react
while (!flag); // wait for event
// react to event

这种方法没有任何基于condvar设计的缺点,但它所花费的成本太高。在task轮询的过程中task处于运行状态(尽管大多数时刻都是阻塞状态),因此它需要占用一个硬件线程,并且因此将产生context switch成本。condvar不存在这种缺陷,因为线程处于真正的阻塞状态。


condvar与bool的结合

 
在此设计中,flag指示事件是否发生,并且通过mutex完成对flag的访问,因此flag不再需要std::atomic。此时的detecting task如下所示:

1
2
3
4
5
6
7
8
9
std::condition_variable cv; // as before
std::mutex m;
bool flag(false); // not std::atomic
// detect event
{
std::lock_guard<std::mutex> g(m); // lock m via g's ctor
flag = true; // tell reacting task (part 1)
} // unlock m via g's dtor
cv.notify_one(); // tell reacting task(part 2)

reacting task实现如下所示:
1
2
3
4
5
6
7
8
// prepare to react
{ // as before
std::unique_lock<std::mutex> lk(m); // as before
cv.wait(lk, [] { return flag; }); // use lambda to avoid spurious wakeups
// react to event
// (m is locked)
}
// continue reacting(m now unlocked)

这一设计完全规避了之前提及的所有问题,但总的看来似乎仍然存在某些多余的开销,代码不够明晰流畅。


void future

 
另一种方法是通过让reacting task等待detecting task设置的future来规避condvar、mutex、flag的使用。 Item38指出,发送端是std::promise且接收端是future的通信信道完全可以用于双端通信。这项设计相当简单,detecting task持有std::promise对象(即通信信道的写入端),并且reacting task持有相应的future。当特定事件触发时,detecting task设定std::promise(即写入通信信道)。与此同时,reacting task在其future上执行wait。wait将阻塞reacting task直至std::promise已完成设定。

std::promise和future(包括std::future与std::shared_future)都是需要类型参数的模板,该类型参数指明需要通过通信信道发送的数据类型。然而在本次实例中,并没有任何需要传达的数据,reacting task唯一感兴趣的是其future是否已被设。因此本次数据类型为void,如此一来,即使reacting task不会从detecting task接收到任何数据,通信信道亦允许reacting task通过future判断事件是否触发。

综上,本设计有promise定义如下:

1
std::promise<void> p; // promise for communications channel

detecting task可写为:
1
2
// detect event
p.set_value(); // tell reacting task

reacting task可写为:
1
2
3
// prepare to react
p.get_future().wait(); // wait on future corresponding to p
// react to event


void future的不足

 
首先,Item38中我们便已明确,std::promise与future之间存在着共享状态,而共享状态往往动态分配,动态分配带来了堆分配与释放的开销。

此外,std::promise和future之间的通信通道是一次性机制:它不能重复使用,这与基于condvar和flag的设计有着明显区别,此二者均可用于多次通信。


void future与线程暂停技术

 
假设你需要暂停一个线程,使用void future的设计是一个合理的选择。以下是该技术的核心:

1
2
3
4
5
6
7
8
9
std::promise<void> p;
void react(); // func for reacting task
void detect(){ // func for detecting task
std::thread t([]{p.get_future().wait();react();});// suspend t until future is set
// here, t is suspended prior to call to react
p.set_value(); // unsuspend t (and thus call react)
// do additional work
t.join(); // make t unjoinable (see Item 37)
}

为了保证thread在所有路径均为unjoinable,上述程序可被改写为:
1
2
3
4
5
6
void detect(){
ThreadRAII tr(std::thread([]{p.get_future().wait();react();}),ThreadRAII::DtorAction::join);
// thread inside tr is suspended here
p.set_value(); // unsuspend thread inside tr

}

上述代码看似完美,但实际上存在问题。若在第一个”…”处抛出异常,则p将永远无法完成设定,因此tr将永远处于无法完成的状态。(作者将如何解决作为习题留给读者,我第一眼想到的方案是try catch,在catch语句中析构tr)。

多个reacting task的暂停

其关键在于使用std::shared_future以代替std::future。std::future的share成员函数将其共享状态的所有权转移到share生成的std::shared_future对象,本实现唯一的微妙之处即为每个reacting thread都需要自己的std::shared_future副本来引用共享状态,因此从share获取的std::shared_future由运行在reacting thread上的lambda按值捕获:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::promise<void> p; // as before
void detect(){ // now for multiple reacting tasks
auto sf = p.get_future().share(); // sf's type is std::shared_future<void>
std::vector<std::thread> vt; // container for reacting threads
for (int i = 0; i < threadsToRun; ++i) {
vt.emplace_back([sf]{ sf.wait();react(); }); // capture by-value
}
// detect hangs if this "…" code throws!
p.set_value(); // unsuspend all threads

for (auto& t : vt) { // make all threads unjoinable
t.join();
}
}