Celery 快速上手指南-Python篇

Celery 一个强大的分布式任务调度器, 来了解一下它独特的魅力吧~

0%

Github -> https://github.com/celery/celery
Docs -> https://docs.celeryq.dev/en/stable/index.html

简单了解

  • Celery 是一个异步任务队列, 用于分布式调度任务
  • 组件
    • Celery Beat 任务调度器
      用于发布任务到任务队列
    • Celery Workers 执行任务的消费者
      默认情况下 Celery 会在不同进程执行任务, 可以分布到多台机器
    • Broker 消息代理
      用来传递任务的, 通常搭配 RabbitMQ or Redis 使用
    • Producer 任务生产者
      调用Celery提供的API进行任务添加
    • Result Backend 结果后端
      保存了任务结果以供查询

安装

pip install -U Celery

这里用 Redis 作为传递消息队列和后端, 可以直接安装捆绑包.

pip install -U "celery[redis]"

例子

创建文件 dome.pytest.py

.
├── dome.py
└── test.py

dome.py

import time
from celery import Celery

# redis://:password@hostname:port/db_number
broker = 'redis://localhost:6379/2' # 任务队列
backend = 'redis://localhost:6379/3' # 后端(存储任务结果)

# 第一个参数是生成任务名的前缀, 不影响后续操作
# result_expires 参数是后端结果保存时间
app = Celery('test', broker=broker, backend=backend, result_expires=120)

@app.task # 装饰add函数成为 Task 实例
def add(x, y):
return x + y

@app.task
def wait(wait):
time.sleep(wait)
return wait

运行Celery

celery -A dome worker -l info -c 8
  • -A: 指定创建的celery对象的位置
    dome里有初始化后的实例, worker表示该实例是任务执行者
  • -l: 输出的日志等级
  • -c: 运行的worker数量(并发), 默认和CPU核心数一样

测试

任务调用 -> https://docs.celeryq.dev/en/stable/userguide/calling.html

进入pyhton交互模式或编写test.py测试文件测试都可

import dome
  • delay(): 用于异步调用任务

    # 返回的是 AsyncResult 实例
    dome.add.delay(1,2)
  • apply_async(): 和delay()作用一样, 但支持更多操作

    dome.add.apply_async(args=[1,2])
  • s()

  • ready(): 返回任务完成状态

    result.ready()
  • get(): 获取任务结果(执行过程是阻塞的)

    # 设置超时, 超时未完成抛出 TimeoutError
    result.get(timeout=1)

    # 忽略异常
    result.get(propagate=False)
  • traceback: 获取异常, 无异常为None

    result.traceback

执行流程

这里添加下100个任务

[dome.wait.delay(60) for _ in range(100)]

这时Redis会存在3个键, 实际上执行dome.add.delay(1,2)也有, 但任务已经被取走执行了, 观察不到过程.

  • celery: 里面存放的是待执行的任务队列
  • unacked: 里面是存放的待确认的任务队列(已被分配给Workers的任务)
    unacked 表示为未收到ack(确认讯息)
  • unacked_index: 优先队列, 存放的是unacked相应的索引

任务在生成时会被添加到任务队列里, 这时任务调度器没有运行的话, 你会看到100个任务在任务队列安安静静的躺着.

因为电脑CPU有16核, 默认会创建16个worker, 在添加完任务后的60秒内, unacked里面应该是存在着64条待确认任务(每个worker分配5条), 而任务队列里面有20条待执行任务, 剩下的16条在worker中执行.

任务执行完, worker会拿unacked里的任务执行, 任务队列里会继续把任务分配给有空闲的worker, 直到任务都执行完.

如果强行中断程序的话, unacked里的任务会重新被塞回任务队列里面, 而正在执行中任务则会被丢失, 这是极不推荐的做法, 正确的做法是给celery进程发送TERM信号, 这样woeker会等待任务完成后再关闭.

Task

默认使用

@app.task
def func():
pass

参数说明

  • name: 可以显式指定任务的名字, 默认是模块的命名空间中本函数的名字
  • serializer: 指定本任务的序列化的方法
  • bind: 一个bool值, 设置是否绑定一个task的实例, 如果绑定, task实例会作为参数传递到任务方法中, 可以访问task实例的所有的属性, 即前面反序列化中那些属性
  • base: 定义任务的基类, 可以以此来定义回调函数, 默认是Task类
  • default_retry_delay: 设置该任务重试的延迟时间, 当任务执行失败后, 会自动重试, 单位是秒, 默认3分钟
  • autoretry_for: 设置在特定异常时重试任务, 默认False即不重试
  • retry_backoff: 默认False, 设置重试时的延迟时间间隔策略
  • retry_backoff_max: 设置最大延迟重试时间, 默认10分钟, 如果失败则不再重试
  • retry_jitter: 默认True, 即引入抖动, 避免重试任务集中执行

函数要求

  • 方法应该是等幂的
    • 默认情况下Celery不会执行任何重试逻辑
    • 重试的话需要根据Celery说明配置
  • 涉及I/O的操作应该做好超时处理

重试

错误重试最好不用加在Celery里面, 会使程序设计复杂, 耦合度升高

...

取消任务

revoke()

result = add.apply_async(args=[2, 2], countdown=120)
result.revoke()

使用任务id取消

from proj.celery import app
app.control.revoke(task_id)

定时任务

crontab docs -> https://docs.celeryq.dev/en/stable/reference/celery.schedules.html

配置时区

timezone: 默认为UTC(version>3.0)
值为pytz库支持的时区 -> pytz.common_timezones

app.conf.timezone = 'Asia/Shanghai'

enable_utc: 设置为 false 时,将使用系统本地时区.

app.conf.enable_utc = False

Dome

import time
from random import randint

from celery import Celery # pip install -U Celery
from celery.schedules import crontab

broker = 'redis://localhost:6379/2'
backend = 'redis://localhost:6379/3'

app = Celery('dome', broker=broker, backend=backend, result_expires=120)

app.conf.timezone = 'Asia/Shanghai'
app.conf.beat_schedule = {
'add_schedule': {
'task': 'dome.add',
'schedule': 3.0, # 每3秒执行一次
'args': (16, 16)
},
'wait_schedule': {
'task': 'dome.wait', # 注意模块名要一致
'schedule': crontab(), # 每分钟执行一次
'args': (randint(2,20),) # 注意参数是元组
}
}

@app.task
def wait(wait):
time.sleep(wait)
return wait

@app.task
def add(x, y):
return x + y

运行celery

celery -A dome worker -l info

加上 -B 会同时启动 定时任务

celery -A dome worker -l info -B

启动定时任务

celery -A dome beat -l info

使用配置文件

创建config文件夹, 然后创建 celery.py__init__.py文件

.
├── conf
│ ├── celery.py
│ └── __init__.py
├── dome.py
└── test.py

修改celery.py, 把前面的配置转移进来

配置文档 -> https://docs.celeryq.dev/en/stable/userguide/configuration.html

from random import randint

from celery.schedules import crontab

broker_url = 'redis://localhost:6379/2'
result_backend = 'redis://localhost:6379/3'

result_expires = 120 # 结果存储时间

retry_policy = 5.0

# Worker接收超时, 超时没有确认时任务会被分配到另一个Worker
broker_transport_options = {'visibility_timeout': 600}

timezone = 'Asia/Shanghai'
beat_schedule = {
'add_schedule': {
'task': 'dome.add',
'schedule': 3.0, # 每3秒执行一次
'args': (16, 16)
},
'wait_schedule': {
'task': 'dome.wait', # 注意模块名要一致
'schedule': crontab(), # 每分钟执行一次
'args': (randint(2,20),) # 注意参数是元组
}
}

修改dome.py, 使用 config_from_object() 加载配置文件

import time

from celery import Celery

app = Celery('dome')
app.config_from_object('conf.celery') # 导入配置

@app.task
def wait(wait):
time.sleep(wait)
return wait

@app.task
def add(x, y):
return x + y

其它

  • 查看 Workers 状态

    celery -A <workers> inspect active
------------ 已触及底线了 感谢您的阅读 ------------
  • 本文作者: OWQ
  • 本文链接: https://www.owq.world/73d2d4df/
  • 版权声明: 本站所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处( ̄︶ ̄)↗