Redis 和 I/O 多路复用

bless2012 8年前
   <p>最近在看 UNIX 网络编程并研究了一下 Redis 的实现,感觉 Redis 的源代码十分适合阅读和分析,其中 I/O 多路复用(mutiplexing)部分的实现非常干净和优雅,在这里想对这部分的内容进行简单的整理。</p>    <h2>几种 I/O 模型</h2>    <p>为什么 Redis 中要使用 I/O 多路复用这种技术呢?</p>    <p>首先,Redis 是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的,所以 I/O 操作在一般情况下往往不能直接返回,这会导致某一文件的 I/O 阻塞导致整个进程无法对其它客户提供服务,而 <strong>I/O 多路复用</strong> 就是为了解决这个问题而出现的。</p>    <h3>Blocking I/O</h3>    <p>先来看一下传统的阻塞 I/O 模型到底是如何工作的:当使用 read 或者 write 对某一个 <strong>文件描述符(File Descriptor 以下简称 FD)</strong> 进行读写时,如果当前 FD 不可读或不可写,整个 Redis 服务就不会对其它的操作作出响应,导致整个服务不可用。</p>    <p>这也就是传统意义上的,也就是我们在编程中使用最多的阻塞模型:</p>    <p><img src="https://simg.open-open.com/show/52d56446ea14858270c2e010aaa76049.png"></p>    <p>阻塞模型虽然开发中非常常见也非常易于理解,但是由于它会影响其他 FD 对应的服务,所以在需要处理多个客户端任务的时候,往往都不会使用阻塞模型。</p>    <h3>I/O 多路复用</h3>    <p>虽然还有很多其它的 I/O 模型,但是在这里都不会具体介绍。</p>    <p>阻塞式的 I/O 模型并不能满足这里的需求,我们需要一种效率更高的 I/O 模型来支撑 Redis 的多个客户(redis-cli),这里涉及的就是 I/O 多路复用模型了:</p>    <p><img src="https://simg.open-open.com/show/030b0e6979820b18afeab3ebdd504224.png"></p>    <p>在 I/O 多路复用模型中,最重要的函数调用就是 select ,该方法的能够同时监控多个文件描述符的可读可写情况,当其中的某些文件描述符可读或者可写时, select 方法就会返回可读以及可写的文件描述符个数。</p>    <p>关于 select 的具体使用方法,在网络上资料很多,这里就不过多展开介绍了;</p>    <p>与此同时也有其它的 I/O 多路复用函数 epoll/kqueue/evport ,它们相比 select 性能更优秀,同时也能支撑更多的服务。</p>    <h2>Reactor 设计模式</h2>    <p>Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符)</p>    <p><img src="https://simg.open-open.com/show/76dd4fd3afdb37e96016982dd14bfbf4.png"></p>    <p>文件事件处理器使用 I/O 多路复用模块同时监听多个 FD,当 accept 、 read 、 write 和 close 文件事件产生时,文件事件处理器就会回调 FD 绑定的事件处理器。</p>    <p>虽然整个文件事件处理器是在单线程上运行的,但是通过 I/O 多路复用模块的引入,实现了同时对多个 FD 读写的监控,提高了网络通信模型的性能,同时也可以保证整个 Redis 服务实现的简单。</p>    <h2>I/O 多路复用模块</h2>    <p>I/O 多路复用模块封装了底层的 select 、 epoll 、 avport 以及 kqueue 这些 I/O 多路复用函数,为上层提供了相同的接口。</p>    <p><img src="https://simg.open-open.com/show/93c716f502a769042cd26f357972b687.jpg"></p>    <p>在这里我们简单介绍 Redis 是如何包装 select 和 epoll 的,简要了解该模块的功能,整个 I/O 多路复用模块抹平了不同平台上 I/O 多路复用函数的差异性,提供了相同的接口:</p>    <ul>     <li>static int aeApiCreate(aeEventLoop *eventLoop)</li>     <li>static int aeApiResize(aeEventLoop *eventLoop, int setsize)</li>     <li>static void aeApiFree(aeEventLoop *eventLoop)</li>     <li>static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)</li>     <li>static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)</li>     <li>static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)</li>    </ul>    <p>同时,因为各个函数所需要的参数不同,我们在每一个子模块内部通过一个 aeApiState 来存储需要的上下文信息:</p>    <pre>  <code class="language-cpp">// select  typedef struct aeApiState {        fd_set rfds, wfds;      fd_set _rfds, _wfds;  } aeApiState;    // epoll  typedef struct aeApiState {        int epfd;      struct epoll_event *events;  } aeApiState;</code></pre>    <p>这些上下文信息会存储在 eventLoop 的 void *state 中,不会暴露到上层,只在当前子模块中使用。</p>    <h3>封装 select 函数</h3>    <p>select 可以监控 FD 的可读、可写以及出现错误的情况。</p>    <p>在介绍 I/O 多路复用模块如何对 select 函数封装之前,先来看一下 select 函数使用的大致流程:</p>    <pre>  <code class="language-cpp">int fd = /* file descriptor */    fd_set rfds;    FD_ZERO(&rfds);    FD_SET(fd, &rfds)    for ( ; ; ) {        select(fd+1, &rfds, NULL, NULL, NULL);      if (FD_ISSET(fd, &rfds)) {          /* file descriptor `fd` becomes readable */      }  }</code></pre>    <ol>     <li>初始化一个可读的 fd_set 集合,保存需要监控可读性的 FD;</li>     <li>使用 FD_SET 将 fd 加入 rfds ;</li>     <li>调用 select 方法监控 rfds 中的 FD 是否可读;</li>     <li>当 select 返回时,检查 FD 的状态并完成对应的操作。</li>    </ol>    <p>而在 Redis 的 ae_select 文件中代码的组织顺序也是差不多的,首先在 aeApiCreate 函数中初始化 rfds 和 wfds :</p>    <pre>  <code class="language-cpp">static int aeApiCreate(aeEventLoop *eventLoop) {        aeApiState *state = zmalloc(sizeof(aeApiState));      if (!state) return -1;      FD_ZERO(&state->rfds);      FD_ZERO(&state->wfds);      eventLoop->apidata = state;      return 0;  }</code></pre>    <p>而 aeApiAddEvent 和 aeApiDelEvent 会通过 FD_SET 和 FD_CLR 修改 fd_set 中对应 FD 的标志位:</p>    <pre>  <code class="language-cpp">static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {        aeApiState *state = eventLoop->apidata;      if (mask & AE_READABLE) FD_SET(fd,&state->rfds);      if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);      return 0;  }</code></pre>    <p>整个 ae_select 子模块中最重要的函数就是 aeApiPoll ,它是实际调用 select 函数的部分,其作用就是在 I/O 多路复用函数返回时,将对应的 FD 加入 aeEventLoop 的 fired 数组中,并返回事件的个数:</p>    <pre>  <code class="language-cpp">static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {        aeApiState *state = eventLoop->apidata;      int retval, j, numevents = 0;        memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));      memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));        retval = select(eventLoop->maxfd+1,                  &state->_rfds,&state->_wfds,NULL,tvp);      if (retval > 0) {          for (j = 0; j <= eventLoop->maxfd; j++) {              int mask = 0;              aeFileEvent *fe = &eventLoop->events[j];                if (fe->mask == AE_NONE) continue;              if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))                  mask |= AE_READABLE;              if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))                  mask |= AE_WRITABLE;              eventLoop->fired[numevents].fd = j;              eventLoop->fired[numevents].mask = mask;              numevents++;          }      }      return numevents;  }</code></pre>    <h3>封装 epoll 函数</h3>    <p>Redis 对 epoll 的封装其实也是类似的,使用 epoll_create 创建 epoll 中使用的 epfd :</p>    <pre>  <code class="language-cpp">static int aeApiCreate(aeEventLoop *eventLoop) {        aeApiState *state = zmalloc(sizeof(aeApiState));        if (!state) return -1;      state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);      if (!state->events) {          zfree(state);          return -1;      }      state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */      if (state->epfd == -1) {          zfree(state->events);          zfree(state);          return -1;      }      eventLoop->apidata = state;      return 0;  }</code></pre>    <p>在 aeApiAddEvent 中使用 epoll_ctl 向 epfd 中添加需要监控的 FD 以及监听的事件:</p>    <pre>  <code class="language-cpp">static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {        aeApiState *state = eventLoop->apidata;      struct epoll_event ee = {0}; /* avoid valgrind warning */      /* If the fd was already monitored for some event, we need a MOD       * operation. Otherwise we need an ADD operation. */      int op = eventLoop->events[fd].mask == AE_NONE ?              EPOLL_CTL_ADD : EPOLL_CTL_MOD;        ee.events = 0;      mask |= eventLoop->events[fd].mask; /* Merge old events */      if (mask & AE_READABLE) ee.events |= EPOLLIN;      if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;      ee.data.fd = fd;      if (epoll_ctl(state->epfd,op,fd,ⅇ) == -1) return -1;      return 0;  }</code></pre>    <p>由于 epoll 相比 select 机制略有不同,在 epoll_wait 函数返回时并不需要遍历所有的 FD 查看读写情况;在 epoll_wait 函数返回时会提供一个 epoll_event 数组:</p>    <pre>  <code class="language-cpp">typedef union epoll_data {        void    *ptr;      int      fd; /* 文件描述符 */      uint32_t u32;      uint64_t u64;  } epoll_data_t;    struct epoll_event {        uint32_t     events; /* Epoll 事件 */      epoll_data_t data;  };</code></pre>    <p>其中保存了发生的 epoll 事件( EPOLLIN 、 EPOLLOUT 、 EPOLLERR 和 EPOLLHUP )以及发生该事件的 FD。</p>    <p>aeApiPoll 函数只需要将 epoll_event 数组中存储的信息加入 eventLoop 的 fired 数组中,将信息传递给上层模块:</p>    <pre>  <code class="language-cpp">static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {        aeApiState *state = eventLoop->apidata;      int retval, numevents = 0;        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,              tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);      if (retval > 0) {          int j;            numevents = retval;          for (j = 0; j < numevents; j++) {              int mask = 0;              struct epoll_event *e = state->events+j;                if (e->events & EPOLLIN) mask |= AE_READABLE;              if (e->events & EPOLLOUT) mask |= AE_WRITABLE;              if (e->events & EPOLLERR) mask |= AE_WRITABLE;              if (e->events & EPOLLHUP) mask |= AE_WRITABLE;              eventLoop->fired[j].fd = e->data.fd;              eventLoop->fired[j].mask = mask;          }      }      return numevents;  }</code></pre>    <h3>子模块的选择</h3>    <p>因为 Redis 需要在多个平台上运行,同时为了最大化执行的效率与性能,所以会根据编译平台的不同选择不同的 I/O 多路复用函数作为子模块,提供给上层统一的接口;在 Redis 中,我们通过宏定义的使用,合理的选择不同的子模块:</p>    <pre>  <code class="language-cpp">#ifdef HAVE_EVPORT  #include "ae_evport.c"  #else      #ifdef HAVE_EPOLL      #include "ae_epoll.c"      #else          #ifdef HAVE_KQUEUE          #include "ae_kqueue.c"          #else          #include "ae_select.c"          #endif      #endif  #endif</code></pre>    <p>因为 select 函数是作为 POSIX 标准中的系统调用,在不同版本的操作系统上都会实现,所以将其作为保底方案:</p>    <p><img src="https://simg.open-open.com/show/67a8696989372f3760af7864cafa164a.jpg"></p>    <p>Redis 会优先选择时间复杂度为 $O(1)$ 的 I/O 多路复用函数作为底层实现,包括 Solaries 10 中的 evport 、Linux 中的 epoll 和 macOS/FreeBSD 中的 kqueue ,上述的这些函数都使用了内核内部的结构,并且能够服务几十万的文件描述符。</p>    <p>但是如果当前编译环境没有上述函数,就会选择 select 作为备选方案,由于其在使用时会扫描全部监听的描述符,所以其时间复杂度较差 $O(n)$,并且只能同时服务 1024 个文件描述符,所以一般并不会以 select 作为第一方案使用。</p>    <h2>总结</h2>    <p>Redis 对于 I/O 多路复用模块的设计非常简洁,通过宏保证了 I/O 多路复用模块在不同平台上都有着优异的性能,将不同的 I/O 多路复用函数封装成相同的 API 提供给上层使用。</p>    <p>整个模块使 Redis 能以单进程运行的同时服务成千上万个文件描述符,避免了由于多进程应用的引入导致代码实现复杂度的提升,减少了出错的可能性。</p>    <h2>Reference</h2>    <ul>     <li><a href="/misc/goto?guid=4959726623524331691" rel="nofollow,noindex">Select-Man-Pages</a></li>     <li><a href="/misc/goto?guid=4959726623615112874" rel="nofollow,noindex">Reactor-Pattern</a></li>     <li><a href="/misc/goto?guid=4959726623698210521" rel="nofollow,noindex">epoll vs kqueue</a></li>    </ul>    <h2>其它</h2>    <p>Follow: <a href="/misc/goto?guid=4959673789843239110" rel="nofollow,noindex">Draveness · GitHub</a></p>    <p>Source: <a href="/misc/goto?guid=4959726623811288814" rel="nofollow,noindex">http://draveness.me/redis-io-multiplexing</a></p>    <p> </p>    <p>来自:http://draveness.me/redis-io-multiplexing/</p>    <p> </p>