前言
本节我们将讨论线程应当以何种方式处理一次性事件,即线程仅需要被notice一次。
future
C++标准库将这种一次性事件建模为future
。当线程需要等待某个特定的一次性事件时,它将持有表征该事件的future。随后,线程将以一个较短的时间周期检查事件是否触发。当然,检查期间也会执行其他任务。又或者,线程可以在等待任务期间可以先执行一些别的任务,直到对应的任务触发(此时future的状态变为ready),此后,future将无法被重置。
C++标准库提供了两种future:unique_futurestd::future<>
与shared futurestd::shared_future<>
,它们都位于头文件<future>
内。一个std::future
实例与一个指定事件相关联,而多个std::shared_future
可以共享共一个事件,并在事件完成后同时ready,共享与事件相关的任何数据。如果事件不存在关联数据,则可以使用std::future<void>
与std::shared_future<void>
。
尽管future被广泛应用于线程间通信,但future对象本身并不提供同步访问。若当前存在多个线程需要访问同一个future对象,那么必须使用互斥量或类似的同步机制对数据访问进行保护。但std::shared_future
存在一个特例:多个线程可以访问自己的std :: shared_future
副本而无需同步。
最基础的一次性事件是获取线程函数返回值,由于std::thread`并没有提供一个简单的返回任务结果的接口,因此我们将使用future来实现。
带有返回值的后台任务
假设当前存在一个耗时颇久的计算任务,其结果并不需要立即获得,显然,该任务应当作为后台进程。
开发者可以使用std::async
来启动当前无需立即得到结果的异步任务,std::async
返回一个持有函数返回值的std::future
对象,调用get()
成员函数可以得到返回结果(调用后,当前线程将阻塞至future状态变为ready)。标准使用案例如下所示:1
2
3
4
5
6
7
8
9
int find_the_answer_to_ltuae();
void do_other_stuff();
int main() {
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}
类似于std::thread
,future也支持调用者通过添加额外的参数向其内置任务传递参数。
- 若第一个参数是指向一个成员函数的指针,第二个参数提供有该类的具体对象(无论是直接传入,通过指针还是包装于
std::ref
中),剩余的参数将视作成员函数参数。 - 若不满足条件1,第二个及其之后的参数将视作传入函数或可调用对象的参数。
若传入参数为右值,异步任务中的拷贝将直接通过移动构造函数完成创建,因此std::async
支持将move only
对象作为参数传入。具体使用如下所示:值得注意的是,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct X {
void foo(int,std::string const&);
std::string bar(std::string const&);
};
X x;
auto f1 = std::async(&X::foo,&x,42,"hello");// 调用p->foo(42,"hello") p为成员函数指针
auto f2 = std::async(&X::bar,x,"goodbye");// tmpx.bar("goodbye") tmpx是x的拷贝副本
struct Y {
double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141);// tempy(3.141) tempy由y移动构造函数生成
auto f4=std::async(std::ref(y),2.718);// y(2.718)
X baz(X&);
std::async(baz,std::ref(x)); // 调用baz(x)
class move_only {
public:
move_only();
move_only(move_only&&)
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;
void operator()();
};
auto f5=std::async(move_only()); // 调用tempm,tempm由std::move(move_only())移动构造生成std::async
并不保证任务被异步执行,更进一步地说,std::async
的默认启动策略既可能启动一个新线程运行任务,但该任务也可能仅在future被调用get()
或wait()
时才会运行。为了明确任务启动策略,开发者可向std::async
传入一个额外参数类型std::lunch
,它是一个enum class
,所含枚举量如下: std::launch::async
,意味着函数必然将运行于另一个线程之上(即异步运行)std::launch::defered
,函数的执行被推迟至调用get
或wait
时。当调用get
或wait
时,函数将同步执行(即阻塞调用程序,直到函数完成运行)。如果一直不调用get
或wait
,f将永不执行。
其使用如下所示:1
2
3auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行
auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在 wait()或get()调用时执行
auto f8=std::async(std::launch::deferred | std::launch::async, baz,std::ref(x)); // 默认启动策略
任务与期望
std::async
并非是future与任务关联的唯一方式,开发者也可以将task包装至std::packaged_task
或std::promise
。前者的抽象级别更高。
packaged_task
std::packaged_task
将一个future绑定至函数或可调用对象。当std::packaged_task
对象被调用时,它将调用相关函数或可调用对象,将future状态置为就绪,并将返回值存储于其中。
std::packaged_task
常用于构建线程池的结构单元或管理其他任务,如在任务所在线程运行任务(同步),或在某后台线程中按顺序运行所有任务。如果当前存在一个粒度较大的操作,并且该操作可分解为独立的子任务时,则可将各子任务包装于std::packaged_task
中,并将该实例传递给任务调度器或线程池。这种操作对任务的细节进行了抽象,调度器仅处理std::packaged_task
实例,而非处理单独的函数。
std::packaged_task
的模板参数是一个函数签名,如void()
或int(string&,double)
。std::packaged_task
的构造函数必须传入一个函数或可调用对象,它们的形参与返回值必须能够隐式转为模板参数,即不需要严格地一致。
以下是一个std::packaged_task
的模板偏特化实现:
1 | template<> |
std::packaged_task
是一个可调用对象,它可以被封装至std::function
中,可以作为线程函数传递给std::thread
,可以作为另一个函数的实参(该函数接受一个可调用对象),甚至可以被直接调用。当std::packaged_task
作为函数对象被调用时,提供至其operator()
的参数被传递至其内部包含的函数,返回值将作为异步运行结果存储于get_future
提供的std::future
中。
使用实例
许多GUI框架都利用某个特定的更新线程完成界面更新,因此当某个线程需要更新当前界面时,它将发送信息至更新线程,敦促后者完成界面更新操作。通过std::packaged_task
我们可以实现该需求,并且杜绝了发送自定义信息的需要。具体实现如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread() {
while(!gui_shutdown_message_received()) {
get_and_process_gui_message(); // 1
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty())
continue;
task = std::move(tasks.front());
tasks.pop_front();
} // 此处互斥量的锁被释放
task();
}
}
std::thread gui_bg_thread(gui_thread);
// 将任务置入队列
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
std::packaged_task<void()> task(f);
std::future<void> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}
这段程序十分易懂:GUI线程将反复循环,直至收到一条关闭图形界面的命令。在循环过程中,该线程将轮询界面消息(时间点1),处理诸如用户点击或当前存在于队列内的任务,当前队列内不存在任务时则继续循环。若当前队列内存在任务,则使用std::move
提取该任务,随后释放锁并执行任务。此时future与任务相关,当任务执行完毕后其状态将被置为ready。
将任务置入队列也很简单:首先利用传入参数f创建一个task,并通过该task的成员函数get_future
获取future对象,在将task推入队列后返回该future。若当前明确需要了解任务是否完成,函数调用方可等待future,否则直接丢弃。
在该实例中我们使用了std::packaged_task<void()>
,这表明我们期待传入参数是一个无参数无返回值函数或可调用对象(若其存在返回值,则该返回值将被丢弃)。
promise
对于一些packaged_task不能解决的问题,如任务为非简单调用或存在多个不同来源的任务,我们可以通过std::promise
来显式地设定future。
问题实例
若当前存在一个需要处理大量网络连接的应用程序,一个线程对应一个连接的设计非常易于实现。但是该方案仅适用于网络连接较少的情况,随着连接数的增加(亦即线程数的增加),操作系统资源被大量消耗,并且可能引发较多的上下文切换(线程数目超过硬件核心数),最终导致程序性能下降。为了避免上述问题,在网络连接较多时,通常一个线程会处理多个网络连接。
对于需要处理多个网络连接的线程而言,来自不同网络连接的数据包将会被乱序处理,同样地,需要被发送的数据包也将被乱序发送。在大多数情况下,应用程序的其他线程不是在等待数据发送成功,就是在等待数据接收成功。
std::promise<T>
与std::future<T>
相关联,它将为上述疑难杂症提供如下机制:数据等待线程可以由futur阻塞,而数据发送线程可通过promise
设定相关值,并将future状态置为ready。类似于packaged_task
,future对象可通过get_future
成员函数显式获得。当std::promise
调用set_value
成员函数设定相关值完毕后,future对象状态将被置为ready
,并可用于检索存储值。若你在调用set_value
前析构std::promise
,将会存储一个异常。
下述代码展示了如何利用std::promise
与std::future
完成单线程处理多网络连接。在此实例中,开发者使用std::promise<bool>
与std::future<bool>
表征数据块的成功传输,与future相关的数据是简单的bool量:成功/失败。对于数据传入,该量则为数据包的有效负载。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void process_connections(connection_set& connections) {
while(!done(connections)){
for(connection_iterator connection=connections.begin(),end=connections.end(); connection!=end; ++connection) {
if(connection->has_incoming_data()){
data_packet data=connection->incoming();
std::promise<payload_type>& p = connection->get_promise(data.id);
p.set_value(data.payload);
}
if(connection->has_outgoing_data()){
outgoing_packet data= connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true);
}
}
}
}process_connections
将持续循环至处理完毕,每一次循环都将遍历当前所有连接,检索传入数据或发送当前队列顶端数据。在本实例中,我们假设所有传入数据均具备属性ID
与payload
。
利用future存储异常
当-1作为实参被传入下述程序时将发生异常:
1 | double square_root(double x) { |
若当前该函数被异步调用,将会导致什么结果?1
2std::future<double> f=std::async(square_root,-1);
double y=f.get();
该问题的答案是:异常将代替返回值,被存储至future中(此时future被置位为ready)。当future对象调用get
成员函数时,该异常将被重新抛出(C++标准并没有明确规定抛出原始异常还是其副本,具体实现由各编译器决定)。被封装至std::packaged_task
内的函数或可调用对象抛出异常也是如此。
std::promise
同样会将异常存储于future中,但除此之外,它还提供有显式接口。若开发者明确当前需要存储异常,则可以直接通过set_exception
成员函数完成需求。显然,set_exception
一般出现在catch语句中:1
2
3
4
5
6
7extern std::promise<double> some_promise;
try {
some_promise.set_value(calculate_value());
}
catch(...) {
some_promise.set_exception(std::current_exception());
}
这里使用了std::current_exception
来检索当前异常,存在一种替代方案:使用std::copy_exception()
直接存储新的异常而不抛出:1
some_promise.set_exception(std::copy_exception(std::logic_error("foo ")));
显然,在明确异常类型的前提下,这种写法不仅使得try catch代码块更加直观,也给了编译器更大的优化空间。
另一种存储异常的方法是:在调用std::promise
的set成员函数之前析构std::promise
对象,亦或在调用std::packaged_task
中的任务之前析构std::packaged_task
。在future状态为ready之前析构std::promise
将导致析构函数存储一个std::future_error
异常,其错误码为std::future_errc::broken_promise
。
shared_future
尽管std::future
可以处理所有在线程间数据转移的必要同步,但对std::future
成员函数的调用并不彼此同步,也就是说在多线程下没有保护措施地访问一个future对象将导致数据竞争和未定义行为。这由std::future
的特性决定:std::future
独享同步结果的所有权,在get
成员函数调用后将不会存在任何可获取的值。
如果当前存在多个线程等待同一个事件,那么我们应当使用std::shared_future
。顾名思义,std::future
是一个move only object
,而std::shared_future
可被拷贝。但std::shared_future
也不具备调用成员函数时的同步性,因此在使用时必须以锁保护。
使用std::shared_future
的首选方案是将其副本传递给每一个需要使用的线程,如此则可保证所有线程均可在其内部通过std::shared_future
安全访问结果,使用实例如下图所示。
std::shared_future
的一大潜在用途是实现复杂表格中的并行运行。每个单元个均具备计算结果,并且该值可被其他单元格引用,因此公式内部可使用std::shared_future
对象表征单元格计算结果。在堆所有单元格公式执行并行计算后,所有可被计算的结果将直接计算,而尚存在依赖的单元格计算线程将被阻塞,直到相关数据准备就绪。这种方案最大程度地利用了当前可用的硬件并发。
某些std::shared_future
由std::future
构造而来,由于后者不具备拷贝属性,因此在构造时我们应当使用std::move
:1
2
3
4
5
6std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid()); // true
std::shared_future<int> sf(std::move(f));
assert(!f.valid()); // true
assert(sf.valid()); // true
由于右值具备隐式的移动操作,因此我们可以直接使用get_future
返回值构造std::shared_future
:1
2std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future());
当然了,也可以不必这么麻烦,std::future
的成员函数share
可以直接创建一个std::shared_future
,并将所有权直接传递给它:1
2std::promise<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> p;
auto sf = p.get_future().share();share
的存在使得auto
成为可能,从而避免打出大量错综复杂的类型名。