Async 模块实现入门浅析
summeryct
7年前
<p><img src="https://simg.open-open.com/show/b0075fad0613436664f7e00e98ff7bdf.png"></p> <p>在早期的异步开发中, <a href="/misc/goto?guid=4958534319587992200" rel="nofollow,noindex"> Async </a> 模块是比较有名的异步解决方案。本文会带大家简单看一下 async 模块的几个方法实现思路,具体分别是:</p> <ul> <li>async.waterfall</li> <li>async.each</li> <li>async.eachLimit</li> <li>async.whilst</li> </ul> <p>PS:本文有相应视频—— <a href="/misc/goto?guid=4959749837310973711">优酷地址 </a><a href="/misc/goto?guid=4959749837402632176" rel="nofollow,noindex"> </a> (声音据说有点小)。</p> <p> </p> <h2><strong>waterfall</strong></h2> <p>我们先来看下一个 async.waterfall 的简单使用场景登录:</p> <pre> <code class="language-javascript">async.waterfall([ function (next) { user.get(name, next); }, function (user, next) { if (!user) { return next(new Error('user not found')); } if (passwd != user.passwd) { return next(new Error('wrong password')); } sign.up(name, next); }, function (reward, next) { resource.add(name, reward, next); }, ], function (err, ...res) { if (err) { console.error(err.stack); } console.log(res); });</code></pre> <p>async 的思路是将原本容易出现 callback hell 的嵌套,通过数组并列的方式抹平,并且节省每次判断 error 的代码,按照 <a href="/misc/goto?guid=4959749837491577907" rel="nofollow,noindex"> error first </a> 的约定在内部每次都帮助用户检查异步是否出错。了解了这种想法之后我们可以写个很简单的 waterfall 出来。</p> <pre> <code class="language-javascript">// 确认整体结构 exports.waterfall = function (task = [], callback = noop) { // 默认值 // 拿到 callback 数组 if (!Array.isArray(task)) { return callback(new Error('task should be an array!')); } // TODO }; function noop() {}</code></pre> <p>拿到了 callback 数组之后,我们需要想办法,让这个数组串联的执行起来,即从数组的第一个 callback 开始,一个执行完就自动调用下一个 callback:</p> <pre> <code class="language-javascript">exports.waterfall = function (task = [], callback = noop) { if (!Array.isArray(task)) { return callback(new Error('task should be an array!')); } (function next() { // 取出数组中的第一个 callback 执行 let fn = task.shift(); fn.apply(null, [next]); // ①用户自行调用这个 next })(); };</code></pre> <p>关于 ① 处流程自行走下去结合这里看看:</p> <pre> <code class="language-javascript">async.waterfall([ function(callback) { callback(null, 'one', 'two'); // ①这里 callback 就调用了 next }, // ...</code></pre> <p>理解了这个剩下的就比较好办了:</p> <pre> <code class="language-javascript">exports.waterfall = function (task = [], callback = noop) { if (!Array.isArray(task)) { return callback(new Error('task should be an array!')); } (function next(...args) { // args 获取上一个 callback 传的结果 if (args[0]) { // error first 约定 // 发现第一个参数存在 error 直接返回结束整个流程 return callback(args[0]); } if (task.length) { // 判断 callback 是不是执行完了 let fn = task.shift(); // ② 将 args 平摊到下一个 cb 的开头,next 位于最后 fn.apply(null, [...args.slice(1), next]); } else { // 如果执行完了就结束流程 callback.apply(null, args); } })(); };</code></pre> <p>关于 ② 可以结合例子来看:</p> <pre> <code class="language-javascript">async.waterfall([ function(callback) { callback(null, 'one', 'two'); }, function(arg1, arg2, callback) { // ② // arg1 now equals 'one' and arg2 now equals 'two' callback(null, 'three'); }, function(arg1, callback) { // arg1 now equals 'three' callback(null, 'done'); } ], function (err, result) { // result now equals 'done' });</code></pre> <p>那么到这里一个简单的 waterfall 的实现思路已经完全展现出来了。最后说一下可能出现的问题,比如用户多调了一次 cb (这种情况确实可能出现)所以我们需要做一些简单的预防:</p> <pre> <code class="language-javascript">exports.waterfall = function (task = [], callback = noop) { if (!Array.isArray(task)) { return callback(new Error('task should be an array!')); } (function next(...args) { if (args[0]) { return callback(args[0]); } if (task.length) { let fn = task.shift(); fn.apply(null, [...args.slice(1), onlyOnce(next)]); // 保证只被调用一次 } else { callback.apply(null, args); } })(); }; function onlyOnce(cb) { let flag = false; return (...args) => { if (flag) { return cb(new Error('cb already called')); } cb.apply(null, args); flag = true; }; }</code></pre> <p> </p> <h2><strong>each</strong></h2> <p>async.each 有点像是异步的 arr.map 操作。我们可以来看一个使用的例子:</p> <pre> <code class="language-javascript">'use strict'; const fs = require('fs'); const async = require('async'); const request = require('request'); const sites = ['www.baidu.com','github.com','www.npmjs.com', 'www.zhihu.com']; // 下站站点图标 function downloadFavicon(site, next) { let addr = `https://${site}/favicon.ico`; let file = `./${site}.ico`; request.get(addr) .pipe(fs.createWriteStream(file)) .on('error', (err) => { console.error(`${url} Download failed: ${err.message}`); next(); }) .on('finish', next); } // 下载每一个站点的图标 async.each(sites, downloadFavicon, function (err) { if (err) { console.log('err', err); } console.log('over'); });</code></pre> <p>那么按照例子,我们可以先来搭一个 async.each 的架子:</p> <pre> <code class="language-javascript">exports.each = function (items = [], iterator, callback = noop) { // 判断数组类型 if (!Array.isArray(items)) { return callback(new Error('items should be an array!')); } // 判断迭代器 if (typeof iterator != 'function') { return callback(new Error('iterator should be a function!')); } // TODO };</code></pre> <p>然后我们需要做的事情很简单,只需要将数组的每个一个元素作为参数拿来调用 iterator 函数即可:</p> <pre> <code class="language-javascript">exports.each = function (items = [], iterator, callback = noop) { if (!Array.isArray(items)) { return callback(new Error('items should be an array!')); } if (typeof iterator != 'function') { return callback(new Error('iterator should be a function!')); } function next(err) { // TODO } items.map((item) => iterator(item, next)); };</code></pre> <p>然后我们要想办法在所有的异步操作都执行完之后调用 callback 出去</p> <pre> <code class="language-javascript">exports.each = function (items = [], iterator, callback = noop) { if (!Array.isArray(items)) { return callback(new Error('items should be an array!')); } if (typeof iterator != 'function') { return callback(new Error('iterator should be a function!')); } let completed = 0; // 计数 function next(err) { if (err) { // error first return callback(err); // 结束流程 } if (++completed >= items.length) { // 计数判断 callback(); // 流程结束 } } items.map((item) => iterator(item, next)); };</code></pre> <p>async.each 的实现思路确实如上述例子一样简单,当然还可能会有一些复杂的情况需要判断,更深入的内容各位可以移步 Async 官方的 <a href="/misc/goto?guid=4959749837570189188" rel="nofollow,noindex"> each 实现 </a> 中查看更多。</p> <h2><strong>eachLimit</strong></h2> <p>使用 each 执行操作的时候,在量小的情况下是没有问题的,但是当异步操作的量特别大的时候,就需要对其进行一定的控制。比如写一个爬虫去某种网站上爬图片,那么将图片下载到本地的过程中存在一个文件描述符的限制,即同时打开的文件(保存图片时需要openFile)数目超过一定程度就会收到操作系统的报错。</p> <p>以 each 中出现过的例子来说 eachLimit 的功能:</p> <pre> <code class="language-javascript">const sites = [ ... ]; // 可能非常多站点 // 对 each 操作做 limit,同时最多下载 100 个站点图标 async.eachLimit(sites, 100, downloadFavicon, function (err) { if (err) { console.log('err', err); } console.log('over'); });</code></pre> <p>了解了上述需求之后,我们来搭一个 eachLimit 的架子:</p> <pre> <code class="language-javascript">exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) { if (!Array.isArray(items)) { return callback(new Error('items should be an array!')); } if (typeof iterator != 'function') { return callback(new Error('iterator should be a function!')); } // 同时执行的异步操作数目 (不能超过 limit) let running = 0; // TODO };</code></pre> <p>我首先需要一个循环来将异步操作加入到执行队列,但是只能加到 limit 的数目为止:</p> <pre> <code class="language-javascript">let running = 0; (function next() { while (running < limit) { // 一口气加到队列满为止 let item = items.shift(); running++; iterator(item, (err) => { running--; next(); // 每执行完一个异步操作就触发一下加入队列的行为 }); } })();</code></pre> <p>然后加上结束的操作:</p> <pre> <code class="language-javascript">let done = false; let running = 0; (function next() { if (done && running <= 0) { return callback(); } while (running < limit) { let item = items.shift(); running++; if (item === undefined) { done = true; if (running <= 0) { callback(); } return; } iterator(item, (err) => { running--; next(); }); } })();</code></pre> <p>最后补上错误处理的完整版:</p> <pre> <code class="language-javascript">exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) { if (!Array.isArray(items)) { return callback(new Error('items should be an array!')); } if (typeof iterator != 'function') { return callback(new Error('iterator should be a function!')); } let done = false; let running = 0; let errored = false; (function next() { if (done && running <= 0) { return callback(); } while (running < limit && !errored) { let item = items.shift(); running++; if (item === undefined) { done = true; if (running <= 0) { callback(); } return; } iterator(item, (err) => { running--; if (err) { errored = true; return callback(err); } next(); }); } })(); };</code></pre> <p> </p> <h2><strong>whilst</strong></h2> <p>最后我们来看一个循环异步 whilst 的实现,也是非常的简单。我们先看看使用例子:</p> <pre> <code class="language-javascript">'use strict'; const async = require('async'); let count = 0; async.whilst( function () { return count < 5; }, function (callback) { console.log('count', count++); setTimeout(callback, 1000); }, function (err) { console.log('over'); } );</code></pre> <p>然后因为比较简单,直接来看代码吧:</p> <pre> <code class="language-javascript">exports.whilst = function (test, iterator, callback = noop) { if (typeof test != 'function') { return callback(new Error('iterator should be a function!')); } if (typeof iterator != 'function') { return callback(new Error('iterator should be a function!')); } (function next() { if (test()) { iterator((err) => { if (err) { return callback(err); } next(); }); } })(); };</code></pre> <p> </p> <h2><strong>小结</strong></h2> <p>综上,本文为 callback 的异步流程封装控制的思路做了一点微小的整理工作。实现上并没有完全遵循原版,而是选择使用 es6 的新特性劲量让代码看起来简(zhuang)洁(bi),整体上是为了展现一个思路可能有不少细节没有处理,完整的部分参见 <a href="/misc/goto?guid=4959749837655770493" rel="nofollow,noindex"> async 官方文档 </a></p> <p>。</p> <p>async 的优点可以简单的说,由于 async 基于原生的 callback 所以相比 promise/co 等方式性较好(目前最快的方式是专门优化了速度的 <a href="/misc/goto?guid=4959749837731304539" rel="nofollow,noindex"> neo-async </a> )。并且 async 提供了非常多、非常全面的 60+ 种异步操作方式,功能可谓十分强大。</p> <p>最后简单提一下 async 的一些缺点:</p> <ol> <li>基于 error first 的 <strong>约定</strong> 。约定的意思就是不是强制的,也就存在不了解这个约定或者使用错误方面的问题。</li> <li>流程没有状态。</li> <li>由于功能太过强大(如 async.auto)存在可能滥用的问题。</li> <li>错误栈曲折排查困难。</li> </ol> <p> </p> <p>来自:https://zhuanlan.zhihu.com/p/27303127</p> <p> </p>