Celery 实现分布式任务队列
Celery 简介
Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。
在 Python 中定义 Celery 的时候,我们要引入 Broker,中文中有中间人的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。
这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
Show me the code
# 以下为 dispatcher.py from worker import divide # 1 divide.delay(1,2) # 2 divide.apply_async((1, 2)) # 以下为 worker.py from celery import Celery app = Celery('tasks', backend='amqp://guest@localhost//', broker='redis://') @app.task def divide(x, y): print x / y
worker.py 中新建了一个 Celery 实例,以 amqp 作为 broker,以 redis 作为 backend 储存所有 task 执行的历史记录。我们在此例中使用 RabbitMQ 作为我们的消息队列服务器。
我们一方面通过命令行中执行以下语句来启动 celery 服务。
celery -A worker worker --loglevel=info
另外一方面,我们运行 dispatch.py,代码中将 worker 中的 divide 函数导入,再接着以两种方式将 task 启动。第一种方法中的 delay 方法接收了两个参数,实际为第二种方法的便捷调用,第二种方法在使用时,要将我们要传给 divide 的参数作为 tuple 放在第一个参数位置。
apply_async 的其他参数
apply_async 还支持其他参数,比如设置回调。
设置 task 实例的回调可以采用 link:
divide.apply_async((16, 2), link=divide.s(8))
首先计算 16 / 2,然后把结果 8 / 8,最后执行的结果等于 1. 所以这里的 link 是指向一个后继的调用函数,即完成当前 divide 以后再进行下一个 divide 操作。除了 link 之外,还有 link_error,只会在该任务执行失败时调用。在本例中,我们可以在 divide 执行失败时,执行 link_error 所指的函数,这个函数就是错误消息的处理句柄,它会接收到一个 task 的 UUID,我们可以通过 UUID 来访问出错的任务的异常状态。
# dispatcher divide.apply_async((1, 0), link_error=error_handler.s()) # 这里我们把 1 和 0 放到了 divide 函数中执行,引发了除零异常,继而执行 link_error 对应的 error_handler,error_handler 接收到 uuid 参数,通过 AsyncResult 生产一个结果实例,我们可以用 result.state 打印出该任务的执行情况,用 result.info 来获取异常的具体信息。 # worker @app.task def divide(x, y): print x/y @app.task def error_handler(uuid): result = AsyncResult(uuid) print 'task error {0}'.format(uuid) # [2015-09-01 13:43:26,569: WARNING/Worker-2] task error 8e516377-a6c0-4a40-934f-dd1b0692c5fa print result.state # [2015-09-01 13:43:26,572: WARNING/Worker-2] FAILURE print result.info # [2015-09-01 13:43:26,572: WARNING/Worker-2] integer division or modulo by zero
跟踪异常的成因
异常的成因我们可以如上述代码所示将 result.info 打印出来而得知。然而我们并不能满足于此,仅仅知晓出错的 task 的 UUID 和其状态是不够的,我们想要知道发生错误时,task 的传入参数是什么。我一开始没有尝试出通过 UUID 来获取到原来的 1 和 0 这两个参数,后来我追踪了 apply_async 这个函数,位于 task.py 中,再跟踪到 trace.py 中的 build_tracer 函数,果然在 link_error 的调用时只传递了 UUID 一个参数,代码如下:
def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True): if propagate: raise I = Info(state, exc) R = I.handle_error_state(task, eager=eager) if call_errbacks: group( [signature(errback, app=app) for errback in request.errbacks or []], app=app, ).apply_async((uuid, )) # ).apply_async((uuid, request.args)) # 可以改成上一行注释中的代码,这样就可以在 error_handler 中得到原来调用的任务的输入参数了。 return I, R, I.state, I.retval
此处通过修改 celery 源代码来获取出错时 task 的传入参数,但是方法并不好。于是我想能不能通过 UUID 直接获取到原来的 task,然后查看 task 的 args,但是这篇文档有些晦涩难懂,我就先放弃了,便发现了以下方法。
class DebugTask(Task): abstract = True def on_failure(self, *args, **kwargs): print self.request.args @app.task(base=DebugTask) def divide(x, y): print x / y
这段代码将原来应该继承的 Task 类中的 on_failure 函数重写,当 divide 函数发生异常时,该 task 的 state 自动变成 failure,Task 会自动调用 on_failure 函数,从而打印出传入的 args。
任务的远程调用
关于 task 的调用,celery 还提供了另外一种 send_task 方法。
Celery 作为分布式系统,自然就支持远程 worker,这个时候我们可以利用 send_task 这个函数,以函数名的方式调用 task。代码如下:
from celery import Celery app = Celery() app.config_from_object('celeryconfig') app.send_task('worker.divide', args=[1, 0]) # send_task 也支持 link_error,这个官方文档上没写详细,这里需要调用 signature 函数来生产函数的 signature,这时 divide 的 UUID 和我们通过修改源代码得到的 args。 app.send_task('worker.divide', args=[1, 0], link_error=app.signature('worker.error_handler'))
这里我们没有通过 module 的方式把 divide 函数给 import 到程序中来,也就意味着我们可以不将 worker 放在与 dispatcher 同一目录下。我们的想法是,将 worker 放在另外一台服务器上,通过 celery 调用它,本地 django 项目调用这个 dispatcher 后,将 task 发送到远程服务器的队列中,然后由远程服务器中的 worker 处理。
配置文件
此时需要注意的是,这里的 dispatcher 是通过文件的方式配置的,其配置文件应与 worker 端配置文件吻合,如下:
# celeryconfig.py # coding=utf-8 # Broker 设置 RabbitMQ BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://' # Tasks 位于 worker.py 中 CELERY_IMPORTS = ('worker', ) # 默认为1次/秒的任务 CELERY_ANNOTATIONS = {'worker.divide': {'rate_limit': '1/s'}} CELERY_ROUTES = {'worker.divide': {'queue': 'divide'}, 'worker.error_handler': {'queue': 'error'}} # 默认所有格式为 json CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json']
使用了配置文件以后,我们在 worker 中也可以采用相同的方式定义 app,如下:
# coding=utf-8 from celery import Celery app = Celery() app.config_from_object('celeryconfig') @app.task def divide(x, y): print x / y
我们在配置文件中为 worker.divide 这个 task 指定了 divide 这个队列,为 error_handler 定义了 error 这个队列用于错误处理。在启动 celery 的时候可以通过 -Q 参数指定队列。在终端中执行了以下命令后,celery 服务器就启动了,当前 celery 会监视 divide 队列,取出参数执行任务。而如果我们不启动另外一个 celery 来监视 error 队列,error_handler 就不会前往队列去拿参数执行。
celery -A worker worker --loglevel=info -Q divide
关于 Celery,网上英文教程都不多,更别说中文的了。
网上有些关于 Celery 性能的讨论,我暂且没有做分析,如果有更好的解决方案能够替代它,请留言告知。
如果发现本文有错误,请指正。
来自:http://my.oschina.net/shinedev/blog/500120