Python并发编程之协程/异步IO
Hester8856
8年前
<p style="text-align:center"><img src="https://simg.open-open.com/show/e6e1fd74659662ff5bf16b051ffda8c0.png"></p> <h2>引言</h2> <p>随着node.js的盛行,相信大家今年多多少少都听到了异步编程这个概念。Python社区虽然对于异步编程的支持相比其他语言稍显迟缓,但是也在Python3.4中加入了 asyncio ,在Python3.5上又提供了async/await语法层面的支持,刚正式发布的 Python3.6 中asynico也已经由临时版改为了稳定版。下面我们就基于 <strong>Python3.4+</strong> 来了解一下异步编程的概念以及asyncio的用法。</p> <h2>什么是协程</h2> <p>通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的,对于计算型任务由于GIL的存在我们通常使用多进程来实现,而对与IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。</p> <p>其实对于IO型任务我们还有一种选择就是协程, <strong>协程是运行在单线程当中的“并发”</strong> ,协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。Python中的asyncio也是基于协程来进行实现的。在进入asyncio之前我们先来了解一下Python中怎么通过生成器进行协程来实现并发。</p> <h3>example1</h3> <p>我们先来看一个简单的例子来了解一下什么是协程(coroutine)。</p> <pre> <code class="language-python">>>> def coroutine(): ... reply = yield 'hello' ... yield reply ... >>> c = coroutine() >>> next(c) 'hello' >>> c.send('world') 'world' </code></pre> <h3>example2</h3> <p>下面这个程序我们要实现的功能就是 <strong>模拟多个学生同时向一个老师提交作业</strong> ,按照传统的话我们或许要采用多线程/多进程,但是这里我们可以采用生成器来实现协程用来模拟并发。</p> <p>如果下面这个程序读起来有点困难,可以直接跳到后面部分,并不影响阅读,等你理解协程的本质,回过头来看就很简单了。</p> <pre> <code class="language-python">from collections import deque def student(name, homeworks): for homeworkin homeworks.items(): yield (name, homework[0], homework[1]) # 学生"生成"作业给老师 class Teacher(object): def __init__(self, students): self.students = deque(students) def handle(self): """老师处理学生作业""" while len(self.students): student = self.students.pop() try: homework = next(student) print('handling', homework[0], homework[1], homework[2]) except StopIteration: pass else: self.students.appendleft(student) </code></pre> <p>下面我们来调用一下这个程序。</p> <pre> <code class="language-python">Teacher([ student('Student1', {'math': '1+1=2', 'cs': 'operating system'}), student('Student2', {'math': '2+2=4', 'cs': 'computer graphics'}), student('Student3', {'math': '3+3=5', 'cs': 'compiler construction'}) ]).handle() </code></pre> <p>这是输出结果,我们仅仅只用了一个简单的生成器就实现了并发(concurrence),注意不是并行(parallel),因为我们的程序仅仅是运行在一个单线程当中。</p> <pre> <code class="language-python">handlingStudent3cscompiler construction handlingStudent2cscomputergraphics handlingStudent1csoperatingsystem handlingStudent3math 3+3=5 handlingStudent2math 2+2=4 handlingStudent1math 1+1=2 </code></pre> <p>##使用asyncio模块实现协程</p> <p>从Python3.4开始asyncio模块加入到了标准库,通过asyncio我们可以轻松实现协程来完成异步IO操作。</p> <p>解释一下下面这段代码,我们创造了一个协程 <em>display_date(num, loop)</em> ,然后它使用关键字 <a href="/misc/goto?guid=4959730533494010389" rel="nofollow,noindex">yield from</a> 来等待协程 <em>asyncio.sleep(2)</em> 的返回结果。而在这等待的2s之间它会让出CPU的执行权,直到asyncio.sleep(2)返回结果。</p> <pre> <code class="language-python"># coroutine.py import asyncio import datetime @asyncio.coroutine # 声明一个协程 def display_date(num, loop): end_time = loop.time() + 10.0 while True: print("Loop: {} Time: {}".format(num, datetime.datetime.now())) if (loop.time() + 1.0) >= end_time: break yield from asyncio.sleep(2) # 阻塞直到协程sleep(2)返回结果 loop = asyncio.get_event_loop() # 获取一个event_loop tasks = [display_date(1, loop), display_date(2, loop)] loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的tasks完成 loop.close() </code></pre> <p>下面是运行结果,注意到并发的效果没有,程序从开始到结束只用大约10s,而在这里我们并没有使用任何的多线程/多进程代码。在实际项目中你可以将asyncio.sleep(secends)替换成相应的IO任务,比如数据库/磁盘文件读写等操作。</p> <pre> <code class="language-python">ziwenxie :: ~ » pythoncoroutine.py Loop: 1 Time: 2016-12-19 16:06:46.515329 Loop: 2 Time: 2016-12-19 16:06:46.515446 Loop: 1 Time: 2016-12-19 16:06:48.517613 Loop: 2 Time: 2016-12-19 16:06:48.517724 Loop: 1 Time: 2016-12-19 16:06:50.520005 Loop: 2 Time: 2016-12-19 16:06:50.520169 Loop: 1 Time: 2016-12-19 16:06:52.522452 Loop: 2 Time: 2016-12-19 16:06:52.522567 Loop: 1 Time: 2016-12-19 16:06:54.524889 Loop: 2 Time: 2016-12-19 16:06:54.525031 Loop: 1 Time: 2016-12-19 16:06:56.527713 Loop: 2 Time: 2016-12-19 16:06:56.528102 </code></pre> <p>在Python3.5中为我们提供更直接的对协程的支持,引入了async/await关键字,上面的代码我们可以这样改写,使用async代替了@asyncio.coroutine,使用了await代替了yield from,这样我们的代码变得更加简洁可读。</p> <pre> <code class="language-python">import asyncio import datetime asyncdef display_date(num, loop): # 声明一个协程 end_time = loop.time() + 10.0 while True: print("Loop: {} Time: {}".format(num, datetime.datetime.now())) if (loop.time() + 1.0) >= end_time: break awaitasyncio.sleep(2) # 等同于yield from loop = asyncio.get_event_loop() # 获取一个event_loop tasks = [display_date(1, loop), display_date(2, loop)] loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的tasks完成 loop.close() </code></pre> <h2>asyncio模块详解</h2> <p>开启事件循环有两种方法,一种方法就是通过调用 run_until_complete ,另外一种就是调用 run_forever 。run_until_complete内置add_done_callback,使用run_forever的好处是可以通过自己自定义add_done_callback,具体差异请看下面两个例子。</p> <h3>run_until_complete()</h3> <pre> <code class="language-python">import asyncio asyncdef slow_operation(future): awaitasyncio.sleep(1) future.set_result('Future is done!') loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) print(loop.is_running()) # False loop.run_until_complete(future) print(future.result()) loop.close() </code></pre> <h3>run_forever()</h3> <p>run_forever相比run_until_complete的优势是添加了一个add_done_callback,可以让我们在task(future)完成的时候调用相应的方法进行后续处理。</p> <pre> <code class="language-python">import asyncio asyncdef slow_operation(future): awaitasyncio.sleep(1) future.set_result('Future is done!') def got_result(future): print(future.result()) loop.stop() loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) future.add_done_callback(got_result) try: loop.run_forever() finally: loop.close() </code></pre> <p>这里还要注意一点,即使你调用了协程方法,但是如果事件循环没有开启,协程也不会执行,参考官方文档的描述,我刚被坑过。</p> <p>Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), <strong>or schedule</strong> its execution using the ensure_future() function or the AbstractEventLoop.create_task() method. Coroutines (and tasks) can only run when the event loop is running.</p> <h3>Call</h3> <p>call_soon()</p> <pre> <code class="language-python">import asyncio def hello_world(loop): print('Hello World') loop.stop() loop = asyncio.get_event_loop() # Schedule a call to hello_world() loop.call_soon(hello_world, loop) # Blocking call interrupted by loop.stop() loop.run_forever() loop.close() </code></pre> <p>下面是运行结果,我们可以通过call_soon提前注册我们的task,并且也可以根据返回的 Handle 进行cancel。</p> <pre> <code class="language-python">HelloWorld </code></pre> <p>call_later()</p> <pre> <code class="language-python">import asyncio import datetime def display_date(end_time, loop): print(datetime.datetime.now()) if (loop.time() + 1.0) < end_time: loop.call_later(1, display_date, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() # Schedule the first call to display_date() end_time = loop.time() + 5.0 loop.call_soon(display_date, end_time, loop) # Blocking call interrupted by loop.stop() loop.run_forever() loop.close() </code></pre> <p>改动一下上面的例子我们来看一下call_later的用法,注意这里并没有像上面那样使用while循环进行操作,我们可以通过call_later来设置每隔1秒去调用display_date()方法。</p> <pre> <code class="language-python">2016-12-24 19:17:13.421649 2016-12-24 19:17:14.422933 2016-12-24 19:17:15.424315 2016-12-24 19:17:16.425571 2016-12-24 19:17:17.426874 </code></pre> <h3>Chain coroutines</h3> <pre> <code class="language-python">import asyncio asyncdef compute(x, y): print("Compute %s + %s ..." % (x, y)) awaitasyncio.sleep(1.0) # 协程compute不会继续往下面执行,直到协程sleep返回结果 return x + y asyncdef print_sum(x, y): result = awaitcompute(x, y) # 协程print_sum不会继续往下执行,直到协程compute返回结果 print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close() </code></pre> <p>下面是输出结果</p> <pre> <code class="language-python">ziwenxie :: ~ » pythonchain.py Compute 1 + 2 ... 1 + 2 = 3 </code></pre> <h2>在爬虫中使用asyncio来实现异步IO</h2> <p>下面我们来通过一个简单的例子来看一下怎么在Python爬虫项目中使用asyncio。by the way: 根据我有限的实验结果,如果要充分发挥asynio的威力,应该使用 aiohttp 而不是requests。而且也要 <strong>合理使用</strong> concurrent.futures 模块提供的线程池/进程池,这一点我会在下一篇博文描述。</p> <pre> <code class="language-python">import asyncio import requests asyncdef spider(loop): # run_in_exectuor会返回一个Future,而不是coroutine object future1 = loop.run_in_executor(None, requests.get, 'https://www.python.org/') future2 = loop.run_in_executor(None, requests.get, 'http://httpbin.org/') # 通过命令行可以发现上面两个网络IO在并发进行 response1 = awaitfuture1 # 阻塞直到future1完成 response2 = awaitfuture2 # 阻塞直到future2完成 print(len(response1.text)) print(len(response2.text)) return 'done' loop = asyncio.get_event_loop() # If the argument is a coroutine object, it is wrapped by ensure_future(). result = loop.run_until_complete(spider(loop)) print(result) loop.close() </code></pre> <p>p.s: 如果你能自己体会到为什么盲目地使用线程池/进程池并不能提高基于asynico模块的程序的效率,我想你对协程的理解也差不多了。</p> <h2>References</h2> <p><a href="/misc/goto?guid=4958968331496731648" rel="nofollow,noindex">DOCUMENTATION OF ASYNCIO</a></p> <p><a href="/misc/goto?guid=4959730533615164021" rel="nofollow,noindex">COROUTINES AND ASYNC/AWAIT</a></p> <p><a href="/misc/goto?guid=4959730533694037320" rel="nofollow,noindex">STACKOVERFLOW</a></p> <p><a href="/misc/goto?guid=4959730533777558705" rel="nofollow,noindex">PyMOTW-3</a></p> <p> </p> <p>来自:http://python.jobbole.com/87202/</p> <p> </p>