同步并发操作——使用同步操作简化代码

前言

 
使用本节描述的技巧编码,开发者可以更多地将注意力集中至同步操作本身——使用函数式编程技法实现并发编程。

相较于在多个线程间直接共享数据,每个任务均拥有自己的数据似乎更为优雅,并且可将任务结果广播至其它线程——这需要使用future实现。


使用future的函数式编程

 
函数式编程(functional programming)指的是一种编程范式,在该范式下,函数结果仅依赖于传入参数,并不依赖外部状态,其命名与数学概念相关。简而言之,若函数输入相同,则函数输出相同。C++标准库中很多与数学相关的函数都有这个特性,例如sin,cos,sqrt以及简单算术运算。

如果不存在任何试图修改共享数据的行为,数据竞争也就不复存在,那么诸如mutex之类的数据保护机制也无需使用。因此,HasKell这类函数式语言日益流行于并发编程。由于大多数操作都仅具备只读性,因此开发者仅需要关心存在写入的操作。

future是我们攻克函数式并发编程的最后一个难点。我们可以在线程间传递future,以此来保证某一次计算结果取决于另一次计算结果,并且无需任何对共享数据的显式访问。


快排实例

函数式快排

下图是一张快排实例图。
WX20200311-123528@2x.png-186.3kB
快排的函数式编程实例如下文所示,需要注意的是,其返回值是一个list,而非像std::sort一样就地排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename T> 
std::list<T> sequential_quick_sort(std::list<T> input) {
if(input.empty()){
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),divide_point);
auto new_lower(sequential_quick_sort(std::move(lower_part)));
auto new_higher(sequential_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);// end is the pivot
result.splice(result.begin(),new_lower);
return result;
}

首先,我们通过splice操作指定input的首个元素为基准点(由于并非选取了最优基准点,因此比较次数与交换次数相有所提升,但为了选取最优基准点引入的list遍历操作也同样耗时)。由于我们知道基准点必然出现在res中,因此使用splice将其添加进来是合理的。在lambda(比较函数)中,我们采取引用捕获的方式以避免不必要的拷贝。接着使用partition确保了list被按照大小分为2部分,并同样通过splice将其拆分为两个list,接着递归执行排序。最终,我们通过splice操作将排序完成的两个区间拼接在一起,至此快排完成。


并发函数式快排

以下是使用了future的并发函数式快排实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename T> 
std::list<T> parallel_quick_sort(std::list<T> input){
if(input.empty()){
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),divide_point);
std::future<std::list<T> > new_lower(std::async(&parallel_quick_sort<T>,std::move(lower_part)));
auto new_higher(parallel_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower.get());
return result;
}

和上个实例相比,二者最大的区别是本例使用std::async在另一个线程排序lower_part,upper_part则与原来一致。使用std::async的一大优势在于充分利用了硬件并发性,举例而言,如果向下执行3次递归,则将启动八个线程执行计算任务,如果执行10次递归(当list规模在1000左右时),将启动1024个线程执行计算。如果当前待执行的任务已经超出了硬件并发资源,则子任务(spawn_task)将被同步执行。这些子任务将在调用get()的线程中运行,从而避免于事无补地将任务传递至另一个线程。需要注意的是,若无std::launch::async的显式指定,std::async将为每一个任务启动一个新线程(即使当前已经超出了硬件最大并发量),有关std::async的启动机制,具体可见https://xander.wiki/post/61b11a8d.html 。此外,由于new_lower是一个future,因此我们必须通过get来等待后台任务运行完成,接着执行splice。get的返回值是一个右值,可以直接move。最后,在此实例中std::partition依然是同步的,因此可以继续改进。

除了使用std::async自动生成spawn_task,开发者也可以通过简单封装std::packaged_taskstd::thread,撰写自己的spawn_task()(如下述代码所示)。

1
2
3
4
5
6
7
8
9
template<typename F,typename A> 
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f,A&& a) {
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)> task(std::move(f)));
std::future<result_type> res(task.get_future());
std::thread t(std::move(task),std::move(a));
t.detach();
return res;
}

这种做法相较于std::async并无优势可言(并且会导致大量的massive oversubcription),但却为“将精密事务迁移入由线程池维护的队列”——这一任务铺平了康庄大道(我们将在第九章详细论述)。

CSP编程范式

 

函数式编程并非是规避共享数据竞争的唯一途径,CSP(Communicating Sequential Processes)范式具备同样的能力。在此编程范式中,线程完全独立的,不再共享数据,但依旧支持线程间通信。

通过信息传递机制实现同步

CSP范式的理念很简单:如果不再存在共享数据,那么开发者可以通过[某一线程]对传入信息的响应方式来推断线程状态。在此情形下,每一个线程实际上都是一个状态机:线程将根据接受到的信息更新自身状态,并将一条或多条信息发送至别的线程,这些行为完全取决于线程自身的初始状态。

由于C++ thread共享地址空间,因此无法从语言层面保证不存在共享数据,开发者必须从实现层面上保证这一点。当然,消息队列必须为各线程所共享,但细节可以被封装至库内。

设计实例

设想现在正在为一台ATM机撰写程序,我们不仅需要处理与使用者的交互,还需要处理与银行后台系统的交互,同样,我们还需要操作物理设备以便让使用者能够自如地完成插卡,取钱,拔卡等操作。

该系统的一种实现方式是拆分任务,使其在三个独立的线程上运行:一个操作物理设备,一个处理ATM逻辑,一个与银行后台通信。这些线程完全可以仅通过信息传递完成通信,根本不需要共享数据。举例而言,当使用者按下按键后,操作物理设备的线程将发送信息至处理ATM逻辑的线程,后者又将发送信息至前者,指示应该吐出多少钱。

ATM 逻辑可以抽象为一个状态机,该线程内部维护一个状态,并根据下一次传入的信息更新自身状态,并反复循环,下图即为一次简单的状态流转。
WX20200311-204143@2x.png-288.2kB
表征ATM逻辑的类具备一些展现自身状态的成员函数。每一个成员函数都与特定的传入信息相关,并根据传入信息更新内部状态,与此同时,每一种信息类型都表现为一种特定的结构体。具体而言,ATM机等待插卡的逻辑可以简化为下述代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
struct card_inserted {
std::string account;
};

class atm {
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;

void (atm::*state)();

std::string account;
std::string pin;

void waiting_for_card() {
interface_hardware.send(display_enter_card());
incoming.wait().
handle<card_inserted>([&](card_inserted const& msg){
// handle仅接受类型为card_inserted的信息
// 别的信息传入将导致丢弃
account=msg.account;
pin="";
interface_hardware.send(display_enter_pin());
state=&atm::getting_pin;
// 执行结束后更新状态
// run()内将执行新状态函数
}
);
}
void getting_pin();

public:
void run() {
state=&atm::waiting_for_card;
try {
for(;;) {
(this->*state)();
}
} catch(messaging::close_queue const&) {
}
};

getting_pin的设计要较为复杂一些,有简化版实例如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void atm::getting_pin() {
incoming.wait()
// 存在三种可传入信息
// 因此handle呈链状
// 每一次对handle的调用都会将消息类型特化为模板参数
.handle<digit_pressed>([&](digit_pressed const& msg){
unsigned const pin_length=4;
pin+=msg.digit;
// 密码尚未输入完时不会改变状态,因此可以持续输入
if(pin.length()==pin_length) {
bank.send(verify_pin(account,pin,incoming));
state=&atm::verifying_pin;
}
}
)
.handle<clear_last_pressed>([&](clear_last_pressed const& msg){
if(!pin.empty()) {
pin.resize(pin.length()-1);
}
}
)
.handle<cancel_pressed>([&](cancel_pressed const& msg){
state=&atm::done_processing;
}
);
}

由上文可见,代码中根本不存在同步和并发,ATM logic和系统的其他组件分别运行在独立的线程上。这种设计手法被称为Actor model——系统内存在多个运行于独立线程上的actor,每一个都通过发送和接受消息来处理当前任务,除了直接通过信息传入的状态外,彼此没有任何共享状态。