Concurrency Control Flow 并发流程控制

jopen 10年前

异步回调:新时代的goto


     回调函数是指将函数(这里的函数是泛指某一块可执行代码的引用,如C++的仿函数或Java中的接口和对象)作为参数传递给另一个函数。回调函数不由函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。因为可以把调用者被调用者分开,所以调用者不关心谁是被调用者。它只需知道存在一个具有特定原型和限制条件的被调用函数,以便该函数在处理相似事件的时候可以灵活的使用不同的方法。


      示例1,在C语言标准库中的qsort快速排序,其函数原型如下:

void qsort( void *buf, size_t num, size_t size,              int (*compare)(const void *, const void *) );

其中第四个参数 compare 就是回调函数,用于“描述”需要排序的数据的大小比较规则。


      示例2,在C语言标准库中的signal信号处理,其函数原型如下:

/* The use of sighandler_t is a GNU extension */  typedef void (*sighandler_t)(int);  sighandler_t signal(int signum, sighandler_t handler);

其中第二个参数handler就是回调函数,用于“注册”接收到某个信号时执行的内容。


      对比示例1和示例2。示例1中的回调函数是被同步调用的,调用者调用qsort函数,qsort函数调用compare函数,相当于调用者间接调了自己提供的回调函数,此类型的回调称为同步回调,调用者明确知道自己实现的回调函数是在什么时候被调用。示例2中的回调函数是被异步调用的,调用者首先将handler函数传给系统,系统记住这个函数,这称为注册回调函数,然后当信号产生时由系统调用handler函数进行处理,此类型的回调称为异步回调,调用者无法知道自己实现的回调函数是在什么时候被调用。可以明显的看出,异步回调有一个注册的过程,即注册回调函数的函数本身不会直接(立即)调用回调函数。


      异步回调实际上是把某个(需要等待的)任务后需要执行的内容通过回调函数传递下去。在函数编程中有一种编程风格叫Continuation-passing style(简称CPS,延续传递风格),CPS是回调函数使用上的特例,形式上就是在函数的最后调用回调函数,这样就好像把函数执行后的结果交给回调函数继续运行,所以称作延续传递风格。这里为何提到CPS呢,因为CPS中流程控制被显式传递给下一步操作,而大多数异步回调做的是相同的事,即完成X后(回调)执行Y,实质也就是干涉(控制)了流程。


      下面通过实际的C/C++代码分析一下,异步回调是如何编程的。

A操作执行完成后等待X任务完成后执行B操作:

void callback() {      B操作;  }    void task() {      A操作;      when_X_completed(callback);  }


这里,B操作作为等待X任务后需要执行的内容,被传递给了when_X_completed函数,即逻辑上的下一步被传递了,而when_X_completed函数的实现本身即符合CPS定义。


现在增加条件,A操作需要依赖数据m,B操作同样需要依赖数据m,m的类型为Mtype:

void callback(void *data) {      Mtype *pm = (Mtype *)data;      B操作;  }    void task() {      Mtype m;      A操作;      when_X_completed(callback, &m);  }


这里数据m的地址被传递进入了callback回调函数,由于when_X_completed的实现者无法知道其回调函数依赖的数据的类型(和个数),只能使用void*泛类型指针来处理, 这也是C/C++语言中回调函数原型几乎都存在一个void*指针参数的原因。在同步回调中,上述的代码能够正常运行,但是在异步回调中上述的程序则是错误的,因为异步回调中你无法知道什么时候callback被调用了,若callback在task函数退出后才被调用,那么此时m的地址已经是非法地址了(栈消亡)。这里需要使用全局变量或堆变量来解决:

void callback(void *data) {      Mtype *pm = (Mtype *)data;      B操作;      delete pm;  }    void task() {      Mtype *pm = new Mtype();      A操作;      when_X_completed(callback, pm);  }


如果B操作依赖的数据不止一个,还得将多个数据组合成一个数据结构(如结构体),再将其传递进入when_X_completed函数。在更高级的语言中,解决回调函数中的数据依赖有更简单的办法——闭包,闭包(Closure)是词法闭包(Lexical Closure)的简称,是引用了自由变量的函数。这个被引用的自由变量将和这个函数一同存在,即使已经离开了创造它的环境也不例外。闭包,简单的说就是内部函数可以访问外部函数的变量,即使外部函数已经退出了也不例外。支持闭包的Javascript语言中将是这么实现的:

function task () {      var m;      A操作;      when_X_completed(function () { //这是一个匿名函数          B操作; //这里直接访问m变量      });  }


现在再次增加条件,执行完B操作后需要再执行C操作,C操作也依赖数据m: 

void callback(void *data) {      Mtype *pm = (Mtype *)data;      B操作;      C操作;      delete pm;  }    void task() {      Mtype *pm = new Mtype();      A操作;      when_X_completed(callback, pm);  }


现在再次增加条件,等待X任务完成和执行B操作只在满足条件S的情况下才执行: 

void callback(void *data) {      Mtype *pm = (Mtype *)data;      B操作;      C操作; //重复部分0      delete pm; //重复部分1  }    void task() {      Mtype *pm = new Mtype();      A操作;      if (条件S)          when_X_completed(callback, pm);      else {          C操作; //重复部分0          delete pm; //重复部分1      }  }

这里可以看到相同的代码被分离在callback和task两个函数中。


若等待X任务完成的过程可能失败或错误呢?失败或错误应该抛异常,但是这里的when_X_completed只是注册了等待X任务完成后执行的回调函数,真正执行X任务的地方是哪儿都不知道,如何抛异常呢?事实上异步回调模式是无法在错误时抛异常的,只能将错误信息通过参数传递给回调函数(注:如果回调函数无视了错误,可能会产生不可预知的行为,但抛异常的情况下,无视异常则会让程序直接core掉,显然后者更容易写出更健壮的程序)。若X任务可能错误,且错误时需要将B操作换成D操作: 

void callback(int ret, void *data) { //这里只考虑了返回码      Mtype *pm = (Mtype *)data;      if (ret == 0) //=0表示正确          B操作;      else          D操作;      C操作; //重复部分0      delete pm; //重复部分1  }    void main_task() {      Mtype *pm = new Mtype();      A操作;      if (条件S)          when_X_completed(callback, pm);      else {          C操作; //重复部分0          delete pm; //重复部分1      }  }


现在最后一次增加条件,无论是否满足条件S,都需要等待Y任务完成后并执行E操作,等待Y任务和等待X任务之间可以同时进行(即并发),两者都完成后(或不满足条件S则只需要Y任务和E操作完成后)再继续剩下的流程: 

struct Session {      Mtype m;      int flag; //0x1表示任务X完成,0x2表示任务Y完成  };    void callback_x(int ret, void *data) {      Session ps = (Session *)data;      if (ret == 0) //=0表示正确          B操作;      else          D操作;      ps->flag |= 0x1; //任务X置完成状态      if (ps->flag == (0x1 | 0x2)) {          C操作; //重复部分0          delete ps; //重复部分1      }  }    void callback_y(void *data) {      Session ps = (Session *)data;      E操作;      ps->flag |= 0x2; //任务Y置完成状态      if (ps->flag == (0x1 | 0x2)) {          C操作; //重复部分0          delete ps; //重复部分1      }  }    void task() {      Session *ps = new Session();      ps->flag = 0x0; //状态标志初始化      A操作;      if (条件S)          when_X_completed(callback_x, ps);      else          ps->flag |= 0x1; //由于这里不需要执行X任务,直接置完成位      when_Y_completed(callback_y, ps);  }


这里使用了flag表示其运行过程中的状态机(在异步编程中普遍使用的机制)。 

通过上面的代码演变可以看出异步回调函数的程序是怎样一步一步变复杂难以理解的。实际上整个流程的控制信息被深深地耦合进了各个函数中,单看callback_x函数的实现,其中需要判断任务Y是否完成(即处理X的时候需要知晓Y的存在),当然这里可以将其判断完成后执行C操作以及删除Session的代码再抽象为一个函数:

struct Session {      Mtype m;      int flag; //0x1表示任务X完成,0x2表示任务Y完成  };    void last_doing(Session *ps) {      if (ps->flag == (0x1 | 0x2)) {          C操作;          delete ps;      }  }    void callback_x(int ret, void *data) {      Session ps = (Session *)data;      if (ret == 0) //=0表示正确          B操作;      else          D操作;      ps->flag |= 0x1; //任务X置完成状态      last_doing(ps);  }    void callback_y(void *data) {      Session ps = (Session *)data;      E操作;      ps->flag |= 0x2; //任务Y置完成状态      last_doing(ps);  }    void task() {      Session *ps = new Session();      ps->flag = 0x0; //状态标志初始化      A操作;      if (条件S)          when_X_completed(callback_x, ps);      else          ps->flag |= 0x1; //由于这里不需要执行X任务,直接置完成位      when_Y_completed(callback_y, ps);  }


由于异步回调函数将数据和逻辑分解得支离破碎,使得数据结构和程序流程被耦合进各个函数中,并显示地使用状态机来表示状态,让维护和理解它们变得异常艰难。


      goto是一条可以在许多计算机编程语言中找到的语句,当执行这条语句的时候,它将控制流程无条件地跳转到另一条语句,它的显著特征之一就是人工通过状态机表达流程。正因为异步回调和goto都具有深度耦合控制流程和人工维护状态机的特点,以及它们的代码都是难以阅读的,它们都强大却难以驾驭,所以笔者这里斗胆将异步回调称为了新时代的goto。同样,和大家对goto的批判一样,笔者也是反对乱用/滥用异步回调函数的。



异步模式与并发


      异步编程模式是用来解决并发的。由于等待任务完成这个过程可能是阻塞的,如等待IO操作或等待网络事件。使用异步模式,可以将等待任务完成后执行的内容通过回调函数注册到事件队列中,并立即执行后面的流程,直到全部可执行的流程执行完毕,再集中性地等待相应阻塞事件执行完成(大多通过IO复用机制实现),并执行相应的回调函数。

      回看上小节中的整个过程,用最简单明了的过程式逻辑描述如下:

void task() {      Mtype m;      A操作;      if (条件S) {          await( all_of( X任务, Y任务 ) );          if (任务X.ret == 0) //=0表示执行成功              B操作;          else              D操作;          E操作;      } else {          await( Y任务 );          E操作;      }      C操作;  }


其中的,await可以理解为等待某任务执行完毕,all_of可以理解为多个任务组合成的新的任务(多个任务并发执行且都完成才算完成)。由于要求并未说明X任务完成后执行的B/D操作和Y任务完成后执行的E操作的顺序要求,这里直接让E操作后执行。


      虽然goto的功能很强大,但是人们长久的经验总结,更推荐使用switch/while/for/continue/break/throw/try/catch等来替代goto。(goto不是目的,循环和分支等才是目的,goto只是完成目的的方式)

      那么对于异步回调,有其他方式可以替代它并写出更简单明了的代码么?答案是肯定的。笔者根据常见的异步回调函数用法分析,发现其用异步回调函数的目的大多时是为了表达

  1. 不等待某任务执行完成。
  2. 等待某任务执行完成。
  3. 等待多个任务中的全部执行完成(其间并发)。
  4. 等待多个任务中的任一执行完成(其间并发)。


      那么根据上述的总结,来抽象一下:

  • 任务:并发流程中最小的控制单元,一段逻辑上可能阻塞的过程视为任务。(对阻塞及阻塞的相关行为的抽象)
  • start原语:异步地执行一个任务,该过程不阻塞当前任务的执行,目标任务启动后立即继续。(上段中的目的1
  • await原语:同步地执行一个任务,该过程会阻塞当前任务的执行,等待目标任务执行完成后继续。(上段中的目的2
  • all_of原语:将多个任务组合为一个新的任务,全部任务执行完成后新的任务视为执行完成。(await+all_of即是上段中的目的3
  • any_of原语:将多个任务组合为一个新的任务,任一任务执行完成后新的任务视为执行完成,同时会取消掉其他未执行完成的任务。(await+any_of即是上段中的目的4


      那么基于这个模式下的代码应该如何表达上小节中的程序呢?本小节前文中的最简单明了的过程式逻辑描述就是它的代码了。


      这里使用该模式表达一下常见的服务模型,例如这是一个UDP聚合类服务,对于每个请求,你需要向底层并发查询两种数据,再聚合后返回(或超时后返回):

task business {      udp0.send;      udp1.send;      await (          any_of (              all_of (                  udp0.recv,                  udp1.recv              ),              sleep(x ms)          )      );      if (完成的是任务0)          udp.send; //回成功包      else          udp.send; //回超时包  }    task main {      while(true) {          await( udp.recv );          start( business );      }  }


其中的udp.recv和sleep是原子任务(即不可再分的任务),而business和main则是组合任务(由一系列任务组合而成)。


      可以看出startawaitall_ofany_of它们是如何配合起来描述并发流程的。在这里任务仅仅表达了逻辑,任务本身并不知道也不关心自己在什么并发环境下被执行。而原语才是表达(并发)控制的,再和固有的if、while等配合,控制了整个流程。当然这里,程序=逻辑+控制,逻辑本身只关注自己,不需要关注外层如何使用自己,比如business可以被await(business),也可以被start(business),使得并发中的控制和逻辑解耦。(注:这里的business不能使用函数来封装,只能使用任务来封装,因为函数本身就限定了只有调用和被调用的关系,且函数调用本身就是等待函数执行完毕,实际上是耦合了await+任务。)它的数据和逻辑是完整的(不会被分解),服务的逻辑过程是由语言的固有语法(分支和循环)表达出来的,不需要单独使用状态机表示它的状态,状态信息固有地存在于分支和循环中。这样任务的实现着只需要关注自己的逻辑,不需要知道自己是否需要和另外的任务并发啊等等,并发间的层次清晰明了,让代码变得容易阅读和复用。


      异步编程的目的是解决并发,异步回调函数只是实现这一过程的手段,它并非神圣不可替代的。



异步模式与运行态切换


      在上一小节中已经抽象出了新的模式,但是如何实现呢?(这里的讨论只针对C/C++语言)


      就异步回调函数的实现而言,它只是将某个(需要等待的)任务后需要执行的内容通过回调函数传递下去,然后执行其他流程,以到达某个(需要等待的)任务不直接阻塞掉整个线程的目的。那么如果有办法保持住当前的运行环境,再切换到其他运行环境,待将来某个(需要等待的)任务完成后再继续当前的运行环境,则也能同样达到某个(需要等待的)任务不直接阻塞掉整个线程的目的。对于C/C++语言,一段代码能够正常执行的条件仅仅是正确的数据正确的指令,当前运行环境的数据主要在自身的栈中(也有部分数据在堆中或静态全局区,但这部分数据不受影响)。在X86体系的CPU中,RBP寄存器(32位下是EBP)是栈帧的帧指针,RSP寄存器(32位下是ESP)是栈帧的栈地址,它们两者即表示了栈,而下一条指令则保存在RIP寄存器(32位下是EIP)中,因此只要寄存器归位,就意味着数据和指令归位,则当前运行态就继续往下执行,完全不知道自己被中断过。因此保持住当前的运行环境实际上就是保存当前寄存器值(注:栈不能被修改),切换到其他运行环境实际上就是替换其寄存器值,继续当前的运行环境实际上就是恢复当前运行环境的寄存器值。当然运行态切换不需要写如此底层的汇编代码,Linux下glibc中的ucontext和Windows下的Fiber都是基于上述的封装,直接用它们就可以了。


      有了运行态切换机制,但如何知晓什么时候应该切换回来呢(切换走当然是触发阻塞操作的时候)?这里还需要一个异步事件队列,将真正的阻塞操作委托给异步事件队列,那么对于任意的阻塞操作,可以等价替换为:注册事件完成后的回调函数释放当前控制权待事件完成时触发回调重获控制权。整个过程如下图:

cocoflow - 并发中的流程控制

其中,释放当前控制权是从当前的运行态X切走,这里往哪儿切不由任务本身决定,而由触发任务的原语决定(任务本身只表达逻辑,控制信息交给原语负责)。重获控制权即是切换回当前的运行态X。


      有了上面的等价替换规则后,再为每个任务增加两个描述字段即可完成全部原语了。这两个字段是:

  • block_to阻塞后释放往哪儿切
  • finish_to完成后往哪儿切


      对于start原语,其逻辑是start的目标任务不阻塞当前任务执行。那么对于start原语,将其目标任务的block_to指向自己(即阻塞后立即切换回来让当前任务继续执行),finish_to指向事件队列(start只是启动目标任务,然后两者就无任何关联了),然后再切换至目标任务运行态即可。start原语的执行过程如下图:

cocoflow - 并发中的流程控制

      对于await原语,其逻辑是await的目标任务阻塞当前任务执行,且目标任务完成后继续当前执行当前任务。那么对于await原语,将目标任务的block_to指向事件队列(即释放掉阻塞掉当前运行态),finish_to指向自己(即完成后继续执行当前运行态),然后再切换至目标任务运行态即可(注:await的意义实际上和函数调用相同,都是等待任务/函数执行完成,真正的实现对于await可以采用复用运行态来优化,等价于函数调用的过程)。await原语的执行过程如下图:

cocoflow - 并发中的流程控制

对于all_of原语,其逻辑是all_of的目标任务间并行,且全部执行完成,all_of本身才执行完成。那么对于all_of原语,将全部目标任务的block_tofinish_to都指向自己,然后再依次切换至目标任务的运行态以启动它们,并且全部任务启动后,循环检测是否全部执行完毕,若未执行完毕,则视为被阻塞,即释放自己,直至全部目标任务执行完成。all_of原语的执行过程如下图:

cocoflow - 并发中的流程控制

(注:这里假定了任务X先执行完成)


对于any_of原语,其逻辑是any_of的目标任务间并行,且任一执行完成,any_of本身就执行完成。那么对于any_of原语,将全部目标任务的block_tofinish_to都指向自己,然后再依次切换至目标任务的运行态以启动它们,并且全部任务启动后,循环检测是否任一执行完毕,若均未执行完毕,则视为被阻塞,即释放自己,直至任一目标任务执行完成。any_of原语的执行过程如下图:

cocoflow - 并发中的流程控制

(注:这里假定了任务X先执行完成)


      上面的原语逻辑描述和示意图采用的是最简单的情况,实际的处理会比上面的描述略复杂一些,但其本质就是上面描述的那些。谈了这么多,笔者并非纸上谈兵,下面则来介绍下笔者自己实现的基于上面模式的框架。


      cocoflow:全称Concurrency Control Flow,即并发流程控制,是笔者实现的一个开源框架。它是一个基于协程和libuv的C++框架,仅通过startawaitall_ofany_of控制流程。目前被托管在github上(https://github.com/chishaxie/cocoflow)。其协程就是前面提到的运行态切换了(也就是Linux下的ucontext和Windows下的Fiber)。而libuv则是一个高性能跨平台且功能强大的异步事件库(NodeJS的核心事件库)。


      在C++代码形态上,任何和原语分别如下:

  • 任务:抽象类task(子类需要实现void run();函数,即业务逻辑)
  • start原语:int start(task* target);函数
  • await原语:int await(task& target);函数
  • all_of原语:抽象类task的子类all_of
  • any_of原语:抽象类task的子类any_of


      对于cocoflow,最核心的部分就是上面的任务和控制原语了,但实际情况是,只有原语,无法描述真正的业务。当然cocoflow也会提供一些功能API(即具体实现的某些task类):

  • sleep:休息,休息x毫秒
  • sync:同步,类似信号。一方等待同步(信号),另一发发送。
  • udp::send:发送udp数据包
  • udp::recv:接收udp数据包
  • udp::recv_by_seq:接收指定序列号的udp数据包
  • tcp::accept:接受连接建立请求
  • tcp::connect:发起建立连接请求
  • tcp::send:发送tcp数据包
  • tcp::recv:接收tcp数据包
  • tcp::recv_till:接收tcp数据包直到匹配某模式串/Buffer填满
  • tcp::recv_by_seq:接收指定序列号的tcp数据包


      有了控制原语和功能API,则就可以写出优雅又实用的代码了。更详尽的文档请参考:cocoflow wiki页

来自:http://my.oschina.net/chishaxie/blog/373073