Python 异步网络爬虫 I
BasilHLIV
8年前
<p>本文主要讨论下面几个问题:</p> <ul> <li> <p>什么是异步(Asynchronous)编程?</p> </li> <li> <p>为什么要使用异步编程?</p> </li> <li> <p>在 Python 中有哪些实现异步编程的方法?</p> </li> <li> <p>Python 3.5 如何使用 async/await 实现异步网络爬虫?</p> </li> </ul> <p>所谓 <strong>异步</strong> 是相对于 <strong>同步(Synchronous)</strong> 的概念来说的,之所以容易造成混乱,是因为刚开始接触这两个概念时容易把 <strong>同步</strong> 看做是 <strong>同时</strong> ,而 <strong>同时</strong> 不是意味着 <strong>并行(Parallel)</strong> 吗?然而实际上同步或者异步是针对于 <strong>时间轴</strong> 的概念,同步意味着 <strong>顺序、统一的时间轴</strong> ,而异步则意味着 <strong>乱序、效率优先的时间轴</strong> 。比如在爬虫运行时,先抓取 A 页面,然后从中提取下一层页面 B 的链接,此时的爬虫程序的运行只能是同步的,B 页面只能等到 A 页面处理完成之后才能抓取;然而对于独立的两个页面 A1 和 A2,在处理 A1 网络请求的时间里,与其让 CPU 空闲而 A2 等在后面,不如先处理 A2,等到谁先完成网络请求谁就先来进行处理,这样可以更加充分地利用 CPU,但是 A1 和 A2 的执行顺序则是不确定的,也就是异步的。</p> <p>很显然,在某些情况下采用异步编程可以提高程序运行效率,减少不必要的等待时间,而之所以能够做到这一点,是因为计算机的 CPU 与其它设备是独立运作的,同时 CPU 的运行效率远高于其他设备的读写(I/O)效率。为了利用异步编程的优势,人们想出了很多方法来重新安排、调度(Schedule)程序的运行顺序,从而最大化 CPU 的使用率,其中包括进程、线程、协程等(具体可参考《 Python 中的进程、线程、协程、同步、异步、回调 》)。在 Python 3.5 以前通过 @types.coroutine 作为修饰器的方式将一个生成器(Generator)转化为一个协程,而在 Python 3.5 中则通过关键词 async/await 来定义一个协程,同时也将 asyncio 纳入为标准库,用于实现基于协程的异步编程。</p> <p>要使用 asyncio 需要理解下面几个概念:</p> <ul> <li> <p>Event loop</p> </li> <li> <p>Coroutine</p> </li> <li> <p>Future & Task</p> </li> </ul> <h3><strong>Event loop</strong></h3> <p>了解 JavaScript 或 Node.js 肯定对事件循环不陌生,我们可以把它看作是一种循环式(loop)的调度机制,它可以安排需要 CPU 执行的操作优先执行,而会被 I/O 阻塞的行为则进入等待队列:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/c0bda6246d9482e1fc70b088273ce6cf.png"></p> <p>asyncio 自带了事件循环:</p> <pre> <code class="language-python">import asyncio loop = anscio.get_event_loop() # loop.run_until_complete(coro()) loop.close()</code></pre> <p>当然你也可以选择其它的实现形式,例如 Sanic 框架采用的 <a href="/misc/goto?guid=4959722886547194200" rel="nofollow,noindex"> uvloop </a> ,用起来也非常简单( 至于性能上是否更优我没有验证过,但至少在 Jupyter Notebook 上 uvloop 用起来更方便):</p> <pre> <code class="language-python">import asyncio import uvloop loop = uvloop.new_event_loop() asyncio.set_event_loop(loop)</code></pre> <h3><strong>Coroutine</strong></h3> <p>Python 3.5 以后推荐使用 async/await 关键词来定义协程,它具有如下特性:</p> <ul> <li>通过 await 将可能阻塞的行为挂起,直到有结果之后继续执行,Event loop 也是据此来对多个协程的执行进行调度的;</li> <li>协程并不像一般的函数一样,通过 coro() 进行调用并不会执行它,而只有将它放入 Event loop 进行调度才能执行。</li> </ul> <p>一个简单的例子:</p> <pre> <code class="language-python">import uvloop import asyncio loop = uvloop.new_event_loop() asyncio.set_event_loop(loop) async def compute(a, b): print("Computing {} + {}...".format(a, b)) await asyncio.sleep(a+b) return a + b tasks = [] for i, j in zip(range(3), range(3)): print(i, j) tasks.append(compute(i, j)) loop.run_until_complete(asyncio.gather(*tasks)) loop.close() ### OUTPUT """ 0 0 1 1 2 2 Computing 0 + 0... Computing 1 + 1... Computing 2 + 2... CPU times: user 1.05 ms, sys: 1.21 ms, total: 2.26 ms Wall time: 4 s """</code></pre> <p>由于我们没办法知道协程将在什么时候调用及返回, asyncio 中提供了 Future 这一对象来追踪它的执行结果。</p> <h3><strong>Future & Task</strong></h3> <p>Future 相当于 JavaScript 中的 Promise ,用于保存 <strong>未来</strong> 可能返回的结果。而 Task 则是 Future 的子类,与 Future 不同的是它包含了一个将要执行的协程( 从而组成一个需要被调度的任务)。还以上面的程序为例,如果想要知道计算结果,可以通过 asyncio.ensure_future() 方法将协程包裹成 Task ,最后再来读取结果:</p> <pre> <code class="language-python">import uvloop import asyncio loop = uvloop.new_event_loop() asyncio.set_event_loop(loop) async def compute(a, b): print("Computing {} + {}...".format(a, b)) await asyncio.sleep(a+b) return a + b tasks = [] for i, j in zip(range(3), range(3)): print(i, j) tasks.append(asyncio.ensure_future(compute(i, j))) loop.run_until_complete(asyncio.gather(*tasks)) for t in tasks: print(t.result()) loop.close() ### OUTPUT """ 0 0 1 1 2 2 Computing 0 + 0... Computing 1 + 1... Computing 2 + 2... 0 2 4 CPU times: user 1.62 ms, sys: 1.86 ms, total: 3.49 ms Wall time: 4.01 s """</code></pre> <h3><strong>异步网络请求</strong></h3> <p>Python 处理网络请求最好用的库就是 requests (应该没有之一),但由于它的请求过程是同步阻塞的,因此只好选用 aiohttp 。为了对比同步与异步情况下的差异,先伪造一个假的异步处理服务器:</p> <pre> <code class="language-python">from sanic import Sanic from sanic.response import text import asyncio app = Sanic(__name__) @app.route("/<word>") @app.route("/") async def index(req, word=""): t = len(word) / 10 await asyncio.sleep(t) return text("It costs {}s to process `{}`!".format(t, word)) app.run()</code></pre> <p>服务器处理耗时与请求参数( word )长度成正比,采用同步请求方式,运行结果如下:</p> <pre> <code class="language-python">import requests as req URL = "http://127.0.0.1:8000/{}" words = ["Hello", "Python", "Fans", "!"] for word in words: resp = req.get(URL.format(word)) print(resp.text) ### OUTPUT """ It costs 0.5s to process `Hello`! It costs 0.6s to process `Python`! It costs 0.4s to process `Fans`! It costs 0.1s to process `!`! CPU times: user 18.5 ms, sys: 2.98 ms, total: 21.4 ms Wall time: 1.64 s """</code></pre> <p>采用异步请求,运行结果如下:</p> <pre> <code class="language-python">import asyncio import aiohttp import uvloop URL = "http://127.0.0.1:8000/{}" words = ["Hello", "Python", "Fans", "!"] async def getPage(session, word): with aiohttp.Timeout(10): async with session.get(URL.format(word)) as resp: print(await resp.text()) loop = uvloop.new_event_loop() asyncio.set_event_loop(loop) session = aiohttp.ClientSession(loop=loop) tasks = [] for word in words: tasks.append(getPage(session, word)) loop.run_until_complete(asyncio.gather(*tasks)) loop.close() session.close() ### OUTPUT """ It costs 0.1s to process `!`! It costs 0.4s to process `Fans`! It costs 0.5s to process `Hello`! It costs 0.6s to process `Python`! CPU times: user 61.2 ms, sys: 18.2 ms, total: 79.3 ms Wall time: 732 ms """</code></pre> <p>从运行时间上来看效果是很明显的。</p> <p> </p> <h3><strong>参考</strong></h3> <ol> <li><a href="/misc/goto?guid=4959670073988273126" rel="nofollow,noindex">Python 中的进程、线程、协程、同步、异步、回调</a></li> <li><a href="/misc/goto?guid=4959722886686055913" rel="nofollow,noindex">Asyncio Document</a></li> <li><a href="/misc/goto?guid=4959722886781252047" rel="nofollow,noindex">aiohttp - HTTP Client</a></li> </ol> <p> </p> <p> </p> <p>来自:http://blog.rainy.im/2016/10/30/python-async-webscraper-i/</p> <p> </p>