Github -> https://github.com/celery/celery
Docs -> https://docs.celeryq.dev/en/stable/index.html
简单了解
- Celery 是一个异步任务队列, 用于分布式调度任务
- 组件
- Celery Beat 任务调度器
用于发布任务到任务队列 - Celery Workers 执行任务的消费者
默认情况下 Celery 会在不同进程执行任务, 可以分布到多台机器 - Broker 消息代理
用来传递任务的, 通常搭配RabbitMQ
orRedis
使用 - Producer 任务生产者
调用Celery提供的API进行任务添加 - Result Backend 结果后端
保存了任务结果以供查询
- Celery Beat 任务调度器
安装
pip install -U Celery |
这里用 Redis
作为传递消息队列和后端, 可以直接安装捆绑包.
pip install -U "celery[redis]" |
例子
创建文件 dome.py
和 test.py
. |
dome.py
import time |
运行Celery
celery -A dome worker -l info -c 8 |
-A
: 指定创建的celery对象的位置dome
里有初始化后的实例,worker
表示该实例是任务执行者-l
: 输出的日志等级-c
: 运行的worker数量(并发), 默认和CPU核心数一样
测试
任务调用 -> https://docs.celeryq.dev/en/stable/userguide/calling.html
进入pyhton
交互模式或编写test.py
测试文件测试都可
import dome |
delay()
: 用于异步调用任务# 返回的是 AsyncResult 实例
dome.add.delay(1,2)apply_async()
: 和delay()
作用一样, 但支持更多操作dome.add.apply_async(args=[1,2])
s()
ready()
: 返回任务完成状态result.ready()
get()
: 获取任务结果(执行过程是阻塞的)# 设置超时, 超时未完成抛出 TimeoutError
result.get(timeout=1)
# 忽略异常
result.get(propagate=False)traceback
: 获取异常, 无异常为None
result.traceback
执行流程
这里添加下100个任务
[dome.wait.delay(60) for _ in range(100)] |
这时Redis
会存在3个键, 实际上执行dome.add.delay(1,2)
也有, 但任务已经被取走执行了, 观察不到过程.
celery
: 里面存放的是待执行的任务队列unacked
: 里面是存放的待确认的任务队列(已被分配给Workers
的任务)
unacked 表示为未收到ack(确认讯息)unacked_index
: 优先队列, 存放的是unacked
相应的索引
任务在生成时会被添加到任务队列里, 这时任务调度器没有运行的话, 你会看到100个任务在任务队列安安静静的躺着.
因为电脑CPU有16核, 默认会创建16个worker
, 在添加完任务后的60秒内, unacked
里面应该是存在着64条待确认任务(每个worker
分配5条), 而任务队列里面有20条待执行任务, 剩下的16条在worker
中执行.
任务执行完, worker
会拿unacked
里的任务执行, 任务队列里会继续把任务分配给有空闲的worker
, 直到任务都执行完.
如果强行中断程序的话, unacked
里的任务会重新被塞回任务队列里面, 而正在执行中任务则会被丢失, 这是极不推荐的做法, 正确的做法是给celery
进程发送TERM
信号, 这样woeker
会等待任务完成后再关闭.
Task
默认使用
|
参数说明
- name: 可以显式指定任务的名字, 默认是模块的命名空间中本函数的名字
- serializer: 指定本任务的序列化的方法
- bind: 一个bool值, 设置是否绑定一个task的实例, 如果绑定, task实例会作为参数传递到任务方法中, 可以访问task实例的所有的属性, 即前面反序列化中那些属性
- base: 定义任务的基类, 可以以此来定义回调函数, 默认是Task类
- default_retry_delay: 设置该任务重试的延迟时间, 当任务执行失败后, 会自动重试, 单位是秒, 默认3分钟
- autoretry_for: 设置在特定异常时重试任务, 默认False即不重试
- retry_backoff: 默认False, 设置重试时的延迟时间间隔策略
- retry_backoff_max: 设置最大延迟重试时间, 默认10分钟, 如果失败则不再重试
- retry_jitter: 默认True, 即引入抖动, 避免重试任务集中执行
函数要求
- 方法应该是等幂的
- 默认情况下Celery不会执行任何重试逻辑
- 重试的话需要根据Celery说明配置
- 涉及
I/O
的操作应该做好超时处理
重试
错误重试最好不用加在Celery里面, 会使程序设计复杂, 耦合度升高
取消任务
revoke()
result = add.apply_async(args=[2, 2], countdown=120) |
使用任务id取消
from proj.celery import app |
定时任务
crontab docs -> https://docs.celeryq.dev/en/stable/reference/celery.schedules.html
配置时区
timezone: 默认为UTC(version>3.0)
值为pytz
库支持的时区 -> pytz.common_timezones
app.conf.timezone = 'Asia/Shanghai' |
enable_utc: 设置为 false 时,将使用系统本地时区.
app.conf.enable_utc = False |
Dome
import time |
运行celery
celery -A dome worker -l info |
加上 -B
会同时启动 定时任务
celery -A dome worker -l info -B |
启动定时任务
celery -A dome beat -l info |
使用配置文件
创建config
文件夹, 然后创建 celery.py
和__init__.py
文件
. |
修改celery.py
, 把前面的配置转移进来
配置文档 -> https://docs.celeryq.dev/en/stable/userguide/configuration.html
from random import randint |
修改dome.py
, 使用 config_from_object()
加载配置文件
import time |
其它
查看 Workers 状态
celery -A <workers> inspect active