python多进程并发编程
jopen
9年前
Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。
借助这个包,可以轻松完成从单进程到并发执行的转换。
一、单进程编程
如果我们新建少量进程,可以如下:import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."
二、进程池操作函数详细
2.1 类multiprocessing.Pool
class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])进程池对象控制了一个工作进程池,从而可以将任务提交给工作进程池并行处理。
它支持异步结果生成,支持超时回调。
processes: 工作进程的个数。
如果它的值为空,则工作进程个数由 cpu_count()得到;
如果initializer不为空,则每个工作进程在启动时都会调用initializer(*initargs)来初始化。
NOTE:
A. 进程池对象的方法只能被创建进程的池所调用。
B. 在Python Version 2.7中,
参数maxtasksperchild 表示一个工作进程在它被退出并被新的工作进程代替之前可以完成的任务数。
它用来支持向未被使用的资源提交任务。
它的默认值为空,表示工作进程会和池存在一样长。
方法定义:
1). apply_async(func[, args[, kwds[, callback]]])
apply()方法的一个变种,它会返回一个结果对象;
如果指定了callback, 它应该是可被调用的,且能接收单个参数。
当结果已经生成时,就会调用callback。
callback应该要能立即完成,否则处理结果的线程将会被阻塞。
2). map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though).
It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks.
The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
3). map_async(func, iterable[, chunksize[, callback]])
A variant of the map() method which returns a result object.
If callback is specified then it should be a callable which accepts a single argument.
When the result becomes ready callback is applied to it (unless the call failed).
callback should complete immediately since otherwise the thread which handles the results will get blocked.
4). imap(func, iterable[, chunksize])
An equivalent of itertools.imap().
The chunksize argument is the same as the one used by the map() method.
For very long iterables using a large value for chunksize can make the job complete much faster than
using the default value of 1.
Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional
timeout parameter: next(timeout) will raise multiprocessing.TimeoutError
if the result cannot be returned within timeout seconds.
5). imap_unordered(func, iterable[, chunksize])
The same as imap() except that the ordering of the results from the returned iterator should be considered arbitrary.
(Only when there is only one worker process is the order guaranteed to be “correct”.)
6). close()
关于进程池的任务提交,当所有任务完成后,工作进程将退出。
7). terminate()
立即停止工作进程而不管工作是否完成。
当进程池对象被回收时,必须立即调用 terminate()
8). join()
等待所有工作进程退出。
在调用它之前,必须先调用 close() 或 terminate()。
2.2 类 multiprocessing.pool.AsyncResult
class multiprocessing.pool.AsyncResult由Pool.apply_async() and Pool.map_async()返回的结果的类
1). get([timeout])
返回生成的结果。
如果 timeout 不为空时,如果结果在 timeout秒内未生成,将会报 multiprocessing.TimeoutError 异常。
如果远程调用报了异常,那这个异常也会被get()提交。
2). wait([timeout])
等待直到结果生成或超时。
3). ready()
返回调用是否完成。
4). successful()
返回调用是否完成并且没有提交异常。
如果结果未准备好,则报AssertionError异常。
三、应用示例
The following example demonstrates the use of a pool:1. 示例一
from multiprocessing import Pooldef f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
import time
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises TimeoutError
2、使用进程池
是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。注意要用apply_async,如果落下async,就变成阻塞版本了。
processes=4是最多并发进程数量。代码如下
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,代码如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
4. 示例四
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,
但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,
那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,
直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子:
#!/usr/bin/env python
#coding=utf-8
"""
Author: Squall
Last modified: 2011-10-18 16:50
Filename: pool.py
Description: a simple sample for pool class
"""
from multiprocessing import Pool
from time import sleep
def f(x):
for i in range(10):
print '%s --- %s ' % (i, x)
sleep(1)
def main():
pool = Pool(processes=3) # set the processes max number 3
for i in range(11,20):
result = pool.apply_async(f, (i,))
pool.close()
pool.join()
if result.successful():
print 'successful'
if __name__ == "__main__":
main()
先创建容量为3的进程池,然后将f(i)依次传递给它,运行脚本后利用ps aux | grep pool.py查看进程情况,
会发现最多只会有三个进程执行。pool.apply_async()用来向进程池提交目标请求,
pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。
但必pool.join()必须使用在pool.close()或者pool.terminate()之后。
其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。
利用multiprocessing下的Pool可以很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低。