同步并发操作——使用future等待一次性事件

前言

 
本节我们将讨论线程应当以何种方式处理一次性事件,即线程仅需要被notice一次。

future

 
C++标准库将这种一次性事件建模为future。当线程需要等待某个特定的一次性事件时,它将持有表征该事件的future。随后,线程将以一个较短的时间周期检查事件是否触发。当然,检查期间也会执行其他任务。又或者,线程可以在等待任务期间可以先执行一些别的任务,直到对应的任务触发(此时future的状态变为ready),此后,future将无法被重置。

C++标准库提供了两种future:unique_futurestd::future<>与shared futurestd::shared_future<>,它们都位于头文件<future>内。一个std::future实例与一个指定事件相关联,而多个std::shared_future可以共享共一个事件,并在事件完成后同时ready,共享与事件相关的任何数据。如果事件不存在关联数据,则可以使用std::future<void>std::shared_future<void>

尽管future被广泛应用于线程间通信,但future对象本身并不提供同步访问。若当前存在多个线程需要访问同一个future对象,那么必须使用互斥量或类似的同步机制对数据访问进行保护。但std::shared_future存在一个特例:多个线程可以访问自己的std :: shared_future副本而无需同步。

最基础的一次性事件是获取线程函数返回值,由于std::thread`并没有提供一个简单的返回任务结果的接口,因此我们将使用future来实现。


带有返回值的后台任务

 
假设当前存在一个耗时颇久的计算任务,其结果并不需要立即获得,显然,该任务应当作为后台进程。

开发者可以使用std::async来启动当前无需立即得到结果的异步任务,std::async返回一个持有函数返回值的std::future对象,调用get()成员函数可以得到返回结果(调用后,当前线程将阻塞至future状态变为ready)。标准使用案例如下所示:

1
2
3
4
5
6
7
8
9
#include <future> 
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main() {
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

类似于std::thread,future也支持调用者通过添加额外的参数向其内置任务传递参数。

  1. 若第一个参数是指向一个成员函数的指针,第二个参数提供有该类的具体对象(无论是直接传入,通过指针还是包装于std::ref中),剩余的参数将视作成员函数参数。
  2. 若不满足条件1,第二个及其之后的参数将视作传入函数或可调用对象的参数。
    若传入参数为右值,异步任务中的拷贝将直接通过移动构造函数完成创建,因此std::async支持将move only对象作为参数传入。具体使用如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    #include <string> 
    #include <future>
    struct X {
    void foo(int,std::string const&);
    std::string bar(std::string const&);
    };
    X x;
    auto f1 = std::async(&X::foo,&x,42,"hello");// 调用p->foo(42,"hello") p为成员函数指针
    auto f2 = std::async(&X::bar,x,"goodbye");// tmpx.bar("goodbye") tmpx是x的拷贝副本


    struct Y {
    double operator()(double);
    };
    Y y;
    auto f3=std::async(Y(),3.141);// tempy(3.141) tempy由y移动构造函数生成
    auto f4=std::async(std::ref(y),2.718);// y(2.718)


    X baz(X&);
    std::async(baz,std::ref(x)); // 调用baz(x)


    class move_only {
    public:
    move_only();
    move_only(move_only&&)
    move_only(move_only const&) = delete;
    move_only& operator=(move_only&&);
    move_only& operator=(move_only const&) = delete;
    void operator()();
    };
    auto f5=std::async(move_only()); // 调用tempm,tempm由std::move(move_only())移动构造生成
    值得注意的是,std::async并不保证任务被异步执行,更进一步地说,std::async的默认启动策略既可能启动一个新线程运行任务,但该任务也可能仅在future被调用get()wait()时才会运行。为了明确任务启动策略,开发者可向std::async传入一个额外参数类型std::lunch,它是一个enum class,所含枚举量如下:
  3. std::launch::async,意味着函数必然将运行于另一个线程之上(即异步运行)
  4. std::launch::defered,函数的执行被推迟至调用getwait时。当调用getwait时,函数将同步执行(即阻塞调用程序,直到函数完成运行)。如果一直不调用getwait,f将永不执行。

其使用如下所示:

1
2
3
auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行 
auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在 wait()或get()调用时执行
auto f8=std::async(std::launch::deferred | std::launch::async, baz,std::ref(x)); // 默认启动策略


任务与期望

 
std::async并非是future与任务关联的唯一方式,开发者也可以将task包装至std::packaged_taskstd::promise。前者的抽象级别更高。

packaged_task

std::packaged_task将一个future绑定至函数或可调用对象。当std::packaged_task对象被调用时,它将调用相关函数或可调用对象,将future状态置为就绪,并将返回值存储于其中。

std::packaged_task常用于构建线程池的结构单元或管理其他任务,如在任务所在线程运行任务(同步),或在某后台线程中按顺序运行所有任务。如果当前存在一个粒度较大的操作,并且该操作可分解为独立的子任务时,则可将各子任务包装于std::packaged_task中,并将该实例传递给任务调度器或线程池。这种操作对任务的细节进行了抽象,调度器仅处理std::packaged_task实例,而非处理单独的函数。

std::packaged_task的模板参数是一个函数签名,如void()int(string&,double)std::packaged_task的构造函数必须传入一个函数或可调用对象,它们的形参与返回值必须能够隐式转为模板参数,即不需要严格地一致。

以下是一个std::packaged_task的模板偏特化实现:

1
2
3
4
5
6
7
8
template<> 
class packaged_task<std::string(std::vector<char>*,int)> {
public:
template<typename Callable>
explicit packaged_task(Callable&& f); // f的形参必须能隐式转为std::vector<char>*,int
std::future<std::string> get_future();// f的返回值必须能隐式转为std::string
void operator()(std::vector<char>*,int);
};

std::packaged_task是一个可调用对象,它可以被封装至std::function中,可以作为线程函数传递给std::thread,可以作为另一个函数的实参(该函数接受一个可调用对象),甚至可以被直接调用。当std::packaged_task作为函数对象被调用时,提供至其operator()的参数被传递至其内部包含的函数,返回值将作为异步运行结果存储于get_future提供的std::future中。

使用实例

许多GUI框架都利用某个特定的更新线程完成界面更新,因此当某个线程需要更新当前界面时,它将发送信息至更新线程,敦促后者完成界面更新操作。通过std::packaged_task我们可以实现该需求,并且杜绝了发送自定义信息的需要。具体实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <deque> 
#include <mutex>
#include <future>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread() {
while(!gui_shutdown_message_received()) {
get_and_process_gui_message(); // 1
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty())
continue;
task = std::move(tasks.front());
tasks.pop_front();
} // 此处互斥量的锁被释放
task();
}
}

std::thread gui_bg_thread(gui_thread);

// 将任务置入队列
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
std::packaged_task<void()> task(f);
std::future<void> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}

这段程序十分易懂:GUI线程将反复循环,直至收到一条关闭图形界面的命令。在循环过程中,该线程将轮询界面消息(时间点1),处理诸如用户点击或当前存在于队列内的任务,当前队列内不存在任务时则继续循环。若当前队列内存在任务,则使用std::move提取该任务,随后释放锁并执行任务。此时future与任务相关,当任务执行完毕后其状态将被置为ready。

将任务置入队列也很简单:首先利用传入参数f创建一个task,并通过该task的成员函数get_future获取future对象,在将task推入队列后返回该future。若当前明确需要了解任务是否完成,函数调用方可等待future,否则直接丢弃。

在该实例中我们使用了std::packaged_task<void()>,这表明我们期待传入参数是一个无参数无返回值函数或可调用对象(若其存在返回值,则该返回值将被丢弃)。


promise

对于一些packaged_task不能解决的问题,如任务为非简单调用或存在多个不同来源的任务,我们可以通过std::promise来显式地设定future。

问题实例

若当前存在一个需要处理大量网络连接的应用程序,一个线程对应一个连接的设计非常易于实现。但是该方案仅适用于网络连接较少的情况,随着连接数的增加(亦即线程数的增加),操作系统资源被大量消耗,并且可能引发较多的上下文切换(线程数目超过硬件核心数),最终导致程序性能下降。为了避免上述问题,在网络连接较多时,通常一个线程会处理多个网络连接。

对于需要处理多个网络连接的线程而言,来自不同网络连接的数据包将会被乱序处理,同样地,需要被发送的数据包也将被乱序发送。在大多数情况下,应用程序的其他线程不是在等待数据发送成功,就是在等待数据接收成功。

std::promise<T>std::future<T>相关联,它将为上述疑难杂症提供如下机制:数据等待线程可以由futur阻塞,而数据发送线程可通过promise设定相关值,并将future状态置为ready。类似于packaged_task,future对象可通过get_future成员函数显式获得。当std::promise调用set_value成员函数设定相关值完毕后,future对象状态将被置为ready,并可用于检索存储值。若你在调用set_value前析构std::promise,将会存储一个异常。

下述代码展示了如何利用std::promisestd::future完成单线程处理多网络连接。在此实例中,开发者使用std::promise<bool>std::future<bool>表征数据块的成功传输,与future相关的数据是简单的bool量:成功/失败。对于数据传入,该量则为数据包的有效负载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <future>

void process_connections(connection_set& connections) {
while(!done(connections)){
for(connection_iterator connection=connections.begin(),end=connections.end(); connection!=end; ++connection) {
if(connection->has_incoming_data()){
data_packet data=connection->incoming();
std::promise<payload_type>& p = connection->get_promise(data.id);
p.set_value(data.payload);
}
if(connection->has_outgoing_data()){
outgoing_packet data= connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true);
}
}
}
}

process_connections将持续循环至处理完毕,每一次循环都将遍历当前所有连接,检索传入数据或发送当前队列顶端数据。在本实例中,我们假设所有传入数据均具备属性IDpayload


利用future存储异常

 
当-1作为实参被传入下述程序时将发生异常:

1
2
3
4
5
6
double square_root(double x) {
if(x<0) {
throw std::out_of_range(“x<0”);
}
return sqrt(x);
}

若当前该函数被异步调用,将会导致什么结果?

1
2
std::future<double> f=std::async(square_root,-1); 
double y=f.get();

该问题的答案是:异常将代替返回值,被存储至future中(此时future被置位为ready)。当future对象调用get成员函数时,该异常将被重新抛出(C++标准并没有明确规定抛出原始异常还是其副本,具体实现由各编译器决定)。被封装至std::packaged_task内的函数或可调用对象抛出异常也是如此。

std::promise同样会将异常存储于future中,但除此之外,它还提供有显式接口。若开发者明确当前需要存储异常,则可以直接通过set_exception成员函数完成需求。显然,set_exception一般出现在catch语句中:

1
2
3
4
5
6
7
extern std::promise<double> some_promise; 
try {
some_promise.set_value(calculate_value());
}
catch(...) {
some_promise.set_exception(std::current_exception());
}

这里使用了std::current_exception来检索当前异常,存在一种替代方案:使用std::copy_exception()直接存储新的异常而不抛出:
1
some_promise.set_exception(std::copy_exception(std::logic_error("foo ")));

显然,在明确异常类型的前提下,这种写法不仅使得try catch代码块更加直观,也给了编译器更大的优化空间。

另一种存储异常的方法是:在调用std::promise的set成员函数之前析构std::promise对象,亦或在调用std::packaged_task中的任务之前析构std::packaged_task。在future状态为ready之前析构std::promise将导致析构函数存储一个std::future_error异常,其错误码为std::future_errc::broken_promise


shared_future

 
尽管std::future可以处理所有在线程间数据转移的必要同步,但对std::future成员函数的调用并不彼此同步,也就是说在多线程下没有保护措施地访问一个future对象将导致数据竞争和未定义行为。这由std::future的特性决定:std::future独享同步结果的所有权,在get成员函数调用后将不会存在任何可获取的值。

如果当前存在多个线程等待同一个事件,那么我们应当使用std::shared_future。顾名思义,std::future是一个move only object,而std::shared_future可被拷贝。但std::shared_future也不具备调用成员函数时的同步性,因此在使用时必须以锁保护。

使用std::shared_future的首选方案是将其副本传递给每一个需要使用的线程,如此则可保证所有线程均可在其内部通过std::shared_future安全访问结果,使用实例如下图所示。
屏幕快照 2019-04-28 上午9.12.54.png-302.9kB

std::shared_future的一大潜在用途是实现复杂表格中的并行运行。每个单元个均具备计算结果,并且该值可被其他单元格引用,因此公式内部可使用std::shared_future对象表征单元格计算结果。在堆所有单元格公式执行并行计算后,所有可被计算的结果将直接计算,而尚存在依赖的单元格计算线程将被阻塞,直到相关数据准备就绪。这种方案最大程度地利用了当前可用的硬件并发。

某些std::shared_futurestd::future构造而来,由于后者不具备拷贝属性,因此在构造时我们应当使用std::move

1
2
3
4
5
6
std::promise<int> p; 
std::future<int> f(p.get_future());
assert(f.valid()); // true
std::shared_future<int> sf(std::move(f));
assert(!f.valid()); // true
assert(sf.valid()); // true

由于右值具备隐式的移动操作,因此我们可以直接使用get_future返回值构造std::shared_future:
1
2
std::promise<std::string> p; 
std::shared_future<std::string> sf(p.get_future());

当然了,也可以不必这么麻烦,std::future的成员函数share可以直接创建一个std::shared_future,并将所有权直接传递给它:
1
2
std::promise<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p; 
auto sf = p.get_future().share();

share的存在使得auto成为可能,从而避免打出大量错综复杂的类型名。