Celery任务队列
什么是任务队列(Task Queue)?
使用任务队列作为分发任务的机制。
一个任务队列的输入是一组被称为任务的工作单元。专用的工人会持续监听任务队列来等待完成新的工作。
Celery通过消息进行通信,通常使用中间人作为客户端和工人(workers)间的媒介。为了初始化一项任务,客户端会添加一条消息到队列中,然后中间人传递这条消息给一个worker。
一个Celery系统可以包含多个工人和中间人,解决高可用可平行扩展问题。
我需要什么?
Celery需要一个消息传输系统来收发消息。RabbitMQ and Redis传输系统功能完备,但也有很多其他的实验性解决方案,如使用SQLite做本地开发。Celery 是用 Python 编写的,但协议可以用任何语言实现。除了 Python 语言实现之外,还有Node.js的node-celery和php的celery-php。 可以通过暴露 HTTP 的方式进行,任务交互以及其它语言的集成开发。
Celery可以在单一机器上,在多台机器上,甚至跨数据中心运行。
Celery任务队列
Celery 是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。
核心部件
- broker
- 消息队列,由第三方消息中间件完成
- 常见有RabbitMQ, Redis, MongoDB等
- worker
- 任务执行器
- 可以有多个worker进程
- worker又可以起多个queue来并行消费消息
- backend
- 后端存储,用于持久化任务执行结果
功能部件
- beat
- 定时器,用于周期性调起任务
- flower
- web管理界面
任务
基本用法是在程序里引用celery,并将函数方法绑定到task
1 | from celery import Celery |
然后调用相应方法即可(delay与apply_async都是异步调用)
1 | from tasks import add |
由于是采用消息队列,因此任务提交之后,程序立刻返回一个任务ID。
之后可以通过该ID查询该任务的执行状态和结果。
关联任务
执行1个任务,完成后再执行第2个,第一个任务的结果做第二个任务的入参
1 | add.apply_async((2, 2), link=add.s(16)) |
还可以做错误处理
1 |
|
定时任务
让任务在指定的时间执行,与下文叙述的周期性任务是不同的。
- ETA, 指定任务执行时间,注意时区
- countdown, 倒计时,单位秒
1 | from datetime import datetime, timedelta |
tip
- 任务的信息是保存在broker中的,因此关闭worker并不会丢失任务信息
- 回收任务(revoke)并非是将队列中的任务删除,而是在worker的内存中保存回收的任务task-id,不同worker之间会自动同步上述revoked task-id。
- 由于信息是保存在内存当中的,因此如果将所有worker都关闭了,revoked task-id信息就丢失了,回收过的任务就又可以执行了。要防治这点,需要在启动worker时指定一个文件用于保存信息
1 | celery -A app.celery worker --loglevel=info &> celery_worker.log --statedb=/var/tmp/celery_worker.state |
过期时间
expires单位秒,超过过期时间还未开始执行的任务会被回收
1 | add.apply_async((10, 10), expires=60) |
重试
max_retries:最大重试次数
interval_start:重试等待时间
interval_step:每次重试叠加时长,假设第一重试等待1s,第二次等待1+n秒
interval_max:最大等待时间
1
2
3
4
5
6add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
任务路由
使用-Q参数为队列(queue)命名,然后调用任务时可以指定相应队列
1 | $ celery -A proj worker -l info -Q celery,priority.high |
工作流
按照一定关系一次调用多个任务
- group: 并行调度
- chain: 串行调度
- chord: 类似group,但分header和body2个部分,header可以是一个group任务,执行完成后调用body的任务
- map: 映射调度,通过输入多个入参来多次调度同一个任务
- starmap: 类似map,入参类似*args
- chunks:将任务按照一定数量进行分组
周期性任务
周期性任务就是按照一定的时间检查反复执行的任务。前面描述的定时任务值的是一次性的任务。
程序中引入并配置好周期性任务后,beat进程就会定期调起相关任务
beat进程是需要单独启动的
1 | $ celery -A proj beat |
或者在worker启动时一起拉起
1 | $ celery -A proj worker -B |
注意一套celery只能启一个beat进程