Async 模块实现入门浅析

summeryct 8年前
   <p><img src="https://simg.open-open.com/show/b0075fad0613436664f7e00e98ff7bdf.png"></p>    <p>在早期的异步开发中, <a href="/misc/goto?guid=4958534319587992200" rel="nofollow,noindex"> Async </a> 模块是比较有名的异步解决方案。本文会带大家简单看一下 async 模块的几个方法实现思路,具体分别是:</p>    <ul>     <li>async.waterfall</li>     <li>async.each</li>     <li>async.eachLimit</li>     <li>async.whilst</li>    </ul>    <p>PS:本文有相应视频—— <a href="/misc/goto?guid=4959749837310973711">优酷地址 </a><a href="/misc/goto?guid=4959749837402632176" rel="nofollow,noindex"> </a> (声音据说有点小)。</p>    <p> </p>    <h2><strong>waterfall</strong></h2>    <p>我们先来看下一个 async.waterfall 的简单使用场景登录:</p>    <pre>  <code class="language-javascript">async.waterfall([    function (next) {      user.get(name, next);    },    function (user, next) {      if (!user) {        return next(new Error('user not found'));      }      if (passwd != user.passwd) {        return next(new Error('wrong password'));      }      sign.up(name, next);    },    function (reward, next) {      resource.add(name, reward, next);    },  ], function (err, ...res) {    if (err) {      console.error(err.stack);    }    console.log(res);  });</code></pre>    <p>async 的思路是将原本容易出现 callback hell 的嵌套,通过数组并列的方式抹平,并且节省每次判断 error 的代码,按照 <a href="/misc/goto?guid=4959749837491577907" rel="nofollow,noindex"> error first </a> 的约定在内部每次都帮助用户检查异步是否出错。了解了这种想法之后我们可以写个很简单的 waterfall 出来。</p>    <pre>  <code class="language-javascript">// 确认整体结构  exports.waterfall = function (task = [], callback = noop) { // 默认值    // 拿到 callback 数组    if (!Array.isArray(task)) {      return callback(new Error('task should be an array!'));    }      // TODO  };    function noop() {}</code></pre>    <p>拿到了 callback 数组之后,我们需要想办法,让这个数组串联的执行起来,即从数组的第一个 callback 开始,一个执行完就自动调用下一个 callback:</p>    <pre>  <code class="language-javascript">exports.waterfall = function (task = [], callback = noop) {    if (!Array.isArray(task)) {      return callback(new Error('task should be an array!'));    }      (function next() {      // 取出数组中的第一个 callback 执行      let fn = task.shift();      fn.apply(null, [next]); // ①用户自行调用这个 next    })();  };</code></pre>    <p>关于 ① 处流程自行走下去结合这里看看:</p>    <pre>  <code class="language-javascript">async.waterfall([      function(callback) {          callback(null, 'one', 'two'); // ①这里 callback 就调用了 next      },  // ...</code></pre>    <p>理解了这个剩下的就比较好办了:</p>    <pre>  <code class="language-javascript">exports.waterfall = function (task = [], callback = noop) {    if (!Array.isArray(task)) {      return callback(new Error('task should be an array!'));    }      (function next(...args) { // args 获取上一个 callback 传的结果      if (args[0]) { // error first 约定        // 发现第一个参数存在 error 直接返回结束整个流程        return callback(args[0]);      }        if (task.length) { // 判断 callback 是不是执行完了        let fn = task.shift();        // ② 将 args 平摊到下一个 cb 的开头,next 位于最后        fn.apply(null, [...args.slice(1), next]);      } else {        // 如果执行完了就结束流程        callback.apply(null, args);      }    })();  };</code></pre>    <p>关于 ② 可以结合例子来看:</p>    <pre>  <code class="language-javascript">async.waterfall([      function(callback) {          callback(null, 'one', 'two');      },      function(arg1, arg2, callback) { // ②          // arg1 now equals 'one' and arg2 now equals 'two'          callback(null, 'three');      },      function(arg1, callback) {          // arg1 now equals 'three'          callback(null, 'done');      }  ], function (err, result) {      // result now equals 'done'  });</code></pre>    <p>那么到这里一个简单的 waterfall 的实现思路已经完全展现出来了。最后说一下可能出现的问题,比如用户多调了一次 cb (这种情况确实可能出现)所以我们需要做一些简单的预防:</p>    <pre>  <code class="language-javascript">exports.waterfall = function (task = [], callback = noop) {    if (!Array.isArray(task)) {      return callback(new Error('task should be an array!'));    }      (function next(...args) {      if (args[0]) {        return callback(args[0]);      }        if (task.length) {        let fn = task.shift();        fn.apply(null, [...args.slice(1), onlyOnce(next)]); // 保证只被调用一次      } else {        callback.apply(null, args);      }    })();  };    function onlyOnce(cb) {    let flag = false;    return (...args) => {      if (flag) {        return cb(new Error('cb already called'));      }      cb.apply(null, args);      flag = true;    };  }</code></pre>    <p> </p>    <h2><strong>each</strong></h2>    <p>async.each 有点像是异步的 arr.map 操作。我们可以来看一个使用的例子:</p>    <pre>  <code class="language-javascript">'use strict';    const fs = require('fs');  const async = require('async');  const request = require('request');    const sites = ['www.baidu.com','github.com','www.npmjs.com', 'www.zhihu.com'];    // 下站站点图标  function downloadFavicon(site, next) {    let addr = `https://${site}/favicon.ico`;    let file = `./${site}.ico`;    request.get(addr)      .pipe(fs.createWriteStream(file))      .on('error', (err) => {        console.error(`${url} Download failed: ${err.message}`);        next();      })      .on('finish', next);  }    // 下载每一个站点的图标  async.each(sites, downloadFavicon, function (err) {    if (err) {      console.log('err', err);    }    console.log('over');  });</code></pre>    <p>那么按照例子,我们可以先来搭一个 async.each 的架子:</p>    <pre>  <code class="language-javascript">exports.each = function (items = [], iterator, callback = noop) {    // 判断数组类型    if (!Array.isArray(items)) {      return callback(new Error('items should be an array!'));    }      // 判断迭代器    if (typeof iterator != 'function') {      return callback(new Error('iterator should be a function!'));    }      // TODO  };</code></pre>    <p>然后我们需要做的事情很简单,只需要将数组的每个一个元素作为参数拿来调用 iterator 函数即可:</p>    <pre>  <code class="language-javascript">exports.each = function (items = [], iterator, callback = noop) {    if (!Array.isArray(items)) {      return callback(new Error('items should be an array!'));    }      if (typeof iterator != 'function') {      return callback(new Error('iterator should be a function!'));    }      function next(err) {      // TODO    }      items.map((item) => iterator(item, next));  };</code></pre>    <p>然后我们要想办法在所有的异步操作都执行完之后调用 callback 出去</p>    <pre>  <code class="language-javascript">exports.each = function (items = [], iterator, callback = noop) {    if (!Array.isArray(items)) {      return callback(new Error('items should be an array!'));    }      if (typeof iterator != 'function') {      return callback(new Error('iterator should be a function!'));    }      let completed = 0; // 计数      function next(err) {      if (err) { // error first        return callback(err); // 结束流程      }        if (++completed >= items.length) { // 计数判断        callback(); // 流程结束      }    }      items.map((item) => iterator(item, next));  };</code></pre>    <p>async.each 的实现思路确实如上述例子一样简单,当然还可能会有一些复杂的情况需要判断,更深入的内容各位可以移步 Async 官方的 <a href="/misc/goto?guid=4959749837570189188" rel="nofollow,noindex"> each 实现 </a> 中查看更多。</p>    <h2><strong>eachLimit</strong></h2>    <p>使用 each 执行操作的时候,在量小的情况下是没有问题的,但是当异步操作的量特别大的时候,就需要对其进行一定的控制。比如写一个爬虫去某种网站上爬图片,那么将图片下载到本地的过程中存在一个文件描述符的限制,即同时打开的文件(保存图片时需要openFile)数目超过一定程度就会收到操作系统的报错。</p>    <p>以 each 中出现过的例子来说 eachLimit 的功能:</p>    <pre>  <code class="language-javascript">const sites = [ ... ]; // 可能非常多站点    // 对 each 操作做 limit,同时最多下载 100 个站点图标  async.eachLimit(sites, 100, downloadFavicon, function (err) {    if (err) {      console.log('err', err);    }    console.log('over');  });</code></pre>    <p>了解了上述需求之后,我们来搭一个 eachLimit 的架子:</p>    <pre>  <code class="language-javascript">exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {    if (!Array.isArray(items)) {      return callback(new Error('items should be an array!'));    }      if (typeof iterator != 'function') {      return callback(new Error('iterator should be a function!'));    }      // 同时执行的异步操作数目 (不能超过 limit)    let running = 0;      // TODO  };</code></pre>    <p>我首先需要一个循环来将异步操作加入到执行队列,但是只能加到 limit 的数目为止:</p>    <pre>  <code class="language-javascript">let running = 0;    (function next() {    while (running < limit) { // 一口气加到队列满为止      let item = items.shift();      running++;        iterator(item, (err) => {        running--;        next(); // 每执行完一个异步操作就触发一下加入队列的行为      });    }  })();</code></pre>    <p>然后加上结束的操作:</p>    <pre>  <code class="language-javascript">let done = false;  let running = 0;    (function next() {    if (done && running <= 0) {      return callback();    }      while (running < limit) {      let item = items.shift();      running++;      if (item === undefined) {        done = true;        if (running <= 0) {          callback();        }        return;      }        iterator(item, (err) => {        running--;        next();      });    }  })();</code></pre>    <p>最后补上错误处理的完整版:</p>    <pre>  <code class="language-javascript">exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {    if (!Array.isArray(items)) {      return callback(new Error('items should be an array!'));    }      if (typeof iterator != 'function') {      return callback(new Error('iterator should be a function!'));    }      let done = false;    let running = 0;    let errored = false;      (function next() {      if (done && running <= 0) {        return callback();      }        while (running < limit && !errored) {        let item = items.shift();        running++;        if (item === undefined) {          done = true;          if (running <= 0) {            callback();          }          return;        }          iterator(item, (err) => {          running--;          if (err) {            errored = true;            return callback(err);          }          next();        });      }    })();  };</code></pre>    <p> </p>    <h2><strong>whilst</strong></h2>    <p>最后我们来看一个循环异步 whilst 的实现,也是非常的简单。我们先看看使用例子:</p>    <pre>  <code class="language-javascript">'use strict';    const async = require('async');    let count = 0;  async.whilst(    function () { return count < 5; },    function (callback) {      console.log('count', count++);      setTimeout(callback, 1000);    },    function (err) {      console.log('over');    }  );</code></pre>    <p>然后因为比较简单,直接来看代码吧:</p>    <pre>  <code class="language-javascript">exports.whilst = function (test, iterator, callback = noop) {    if (typeof test != 'function') {      return callback(new Error('iterator should be a function!'));    }    if (typeof iterator != 'function') {      return callback(new Error('iterator should be a function!'));    }      (function next() {      if (test()) {        iterator((err) => {          if (err) {            return callback(err);          }          next();        });      }    })();  };</code></pre>    <p> </p>    <h2><strong>小结</strong></h2>    <p>综上,本文为 callback 的异步流程封装控制的思路做了一点微小的整理工作。实现上并没有完全遵循原版,而是选择使用 es6 的新特性劲量让代码看起来简(zhuang)洁(bi),整体上是为了展现一个思路可能有不少细节没有处理,完整的部分参见 <a href="/misc/goto?guid=4959749837655770493" rel="nofollow,noindex"> async 官方文档 </a></p>    <p>。</p>    <p>async 的优点可以简单的说,由于 async 基于原生的 callback 所以相比 promise/co 等方式性较好(目前最快的方式是专门优化了速度的 <a href="/misc/goto?guid=4959749837731304539" rel="nofollow,noindex"> neo-async </a> )。并且 async 提供了非常多、非常全面的 60+ 种异步操作方式,功能可谓十分强大。</p>    <p>最后简单提一下 async 的一些缺点:</p>    <ol>     <li>基于 error first 的 <strong>约定</strong> 。约定的意思就是不是强制的,也就存在不了解这个约定或者使用错误方面的问题。</li>     <li>流程没有状态。</li>     <li>由于功能太过强大(如 async.auto)存在可能滥用的问题。</li>     <li>错误栈曲折排查困难。</li>    </ol>    <p> </p>    <p>来自:https://zhuanlan.zhihu.com/p/27303127</p>    <p> </p>