前言
使用本节描述的技巧编码,开发者可以更多地将注意力集中至同步操作本身——使用函数式编程技法实现并发编程。
相较于在多个线程间直接共享数据,每个任务均拥有自己的数据似乎更为优雅,并且可将任务结果广播至其它线程——这需要使用future实现。
使用future的函数式编程
函数式编程(functional programming)指的是一种编程范式,在该范式下,函数结果仅依赖于传入参数,并不依赖外部状态,其命名与数学概念相关。简而言之,若函数输入相同,则函数输出相同。C++标准库中很多与数学相关的函数都有这个特性,例如sin,cos,sqrt以及简单算术运算。
如果不存在任何试图修改共享数据的行为,数据竞争也就不复存在,那么诸如mutex之类的数据保护机制也无需使用。因此,HasKell这类函数式语言日益流行于并发编程。由于大多数操作都仅具备只读性,因此开发者仅需要关心存在写入的操作。
future是我们攻克函数式并发编程的最后一个难点。我们可以在线程间传递future,以此来保证某一次计算结果取决于另一次计算结果,并且无需任何对共享数据的显式访问。
快排实例
函数式快排
下图是一张快排实例图。
快排的函数式编程实例如下文所示,需要注意的是,其返回值是一个list
,而非像std::sort
一样就地排序:
1 | template<typename T> |
首先,我们通过splice操作指定input的首个元素为基准点(由于并非选取了最优基准点,因此比较次数与交换次数相有所提升,但为了选取最优基准点引入的list遍历操作也同样耗时)。由于我们知道基准点必然出现在res中,因此使用splice将其添加进来是合理的。在lambda(比较函数)中,我们采取引用捕获的方式以避免不必要的拷贝。接着使用partition
确保了list被按照大小分为2部分,并同样通过splice将其拆分为两个list,接着递归执行排序。最终,我们通过splice操作将排序完成的两个区间拼接在一起,至此快排完成。
并发函数式快排
以下是使用了future
的并发函数式快排实例。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input){
if(input.empty()){
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),divide_point);
std::future<std::list<T> > new_lower(std::async(¶llel_quick_sort<T>,std::move(lower_part)));
auto new_higher(parallel_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower.get());
return result;
}
和上个实例相比,二者最大的区别是本例使用std::async
在另一个线程排序lower_part,upper_part则与原来一致。使用std::async
的一大优势在于充分利用了硬件并发性,举例而言,如果向下执行3次递归,则将启动八个线程执行计算任务,如果执行10次递归(当list规模在1000左右时),将启动1024个线程执行计算。如果当前待执行的任务已经超出了硬件并发资源,则子任务(spawn_task)将被同步执行。这些子任务将在调用get()的线程中运行,从而避免于事无补地将任务传递至另一个线程。需要注意的是,若无std::launch::async的显式指定,std::async将为每一个任务启动一个新线程(即使当前已经超出了硬件最大并发量),有关std::async的启动机制,具体可见https://xander.wiki/post/61b11a8d.html 。此外,由于new_lower是一个future,因此我们必须通过get来等待后台任务运行完成,接着执行splice。get的返回值是一个右值,可以直接move。最后,在此实例中std::partition
依然是同步的,因此可以继续改进。
除了使用std::async
自动生成spawn_task
,开发者也可以通过简单封装std::packaged_task
与std::thread
,撰写自己的spawn_task()
(如下述代码所示)。
1 | template<typename F,typename A> |
这种做法相较于std::async
并无优势可言(并且会导致大量的massive oversubcription
),但却为“将精密事务迁移入由线程池维护的队列”——这一任务铺平了康庄大道(我们将在第九章详细论述)。
CSP编程范式
函数式编程并非是规避共享数据竞争的唯一途径,CSP(Communicating Sequential Processes)范式具备同样的能力。在此编程范式中,线程完全独立的,不再共享数据,但依旧支持线程间通信。
通过信息传递机制实现同步
CSP范式的理念很简单:如果不再存在共享数据,那么开发者可以通过[某一线程]对传入信息的响应方式来推断线程状态。在此情形下,每一个线程实际上都是一个状态机:线程将根据接受到的信息更新自身状态,并将一条或多条信息发送至别的线程,这些行为完全取决于线程自身的初始状态。
由于C++ thread共享地址空间,因此无法从语言层面保证不存在共享数据,开发者必须从实现层面上保证这一点。当然,消息队列必须为各线程所共享,但细节可以被封装至库内。
设计实例
设想现在正在为一台ATM机撰写程序,我们不仅需要处理与使用者的交互,还需要处理与银行后台系统的交互,同样,我们还需要操作物理设备以便让使用者能够自如地完成插卡,取钱,拔卡等操作。
该系统的一种实现方式是拆分任务,使其在三个独立的线程上运行:一个操作物理设备,一个处理ATM逻辑,一个与银行后台通信。这些线程完全可以仅通过信息传递完成通信,根本不需要共享数据。举例而言,当使用者按下按键后,操作物理设备的线程将发送信息至处理ATM逻辑的线程,后者又将发送信息至前者,指示应该吐出多少钱。
ATM 逻辑可以抽象为一个状态机,该线程内部维护一个状态,并根据下一次传入的信息更新自身状态,并反复循环,下图即为一次简单的状态流转。
表征ATM逻辑的类具备一些展现自身状态的成员函数。每一个成员函数都与特定的传入信息相关,并根据传入信息更新内部状态,与此同时,每一种信息类型都表现为一种特定的结构体。具体而言,ATM机等待插卡的逻辑可以简化为下述代码。
1 | struct card_inserted { |
getting_pin
的设计要较为复杂一些,有简化版实例如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26void atm::getting_pin() {
incoming.wait()
// 存在三种可传入信息
// 因此handle呈链状
// 每一次对handle的调用都会将消息类型特化为模板参数
.handle<digit_pressed>([&](digit_pressed const& msg){
unsigned const pin_length=4;
pin+=msg.digit;
// 密码尚未输入完时不会改变状态,因此可以持续输入
if(pin.length()==pin_length) {
bank.send(verify_pin(account,pin,incoming));
state=&atm::verifying_pin;
}
}
)
.handle<clear_last_pressed>([&](clear_last_pressed const& msg){
if(!pin.empty()) {
pin.resize(pin.length()-1);
}
}
)
.handle<cancel_pressed>([&](cancel_pressed const& msg){
state=&atm::done_processing;
}
);
}
由上文可见,代码中根本不存在同步和并发,ATM logic和系统的其他组件分别运行在独立的线程上。这种设计手法被称为Actor model——系统内存在多个运行于独立线程上的actor,每一个都通过发送和接受消息来处理当前任务,除了直接通过信息传入的状态外,彼此没有任何共享状态。