0、前言
本篇是《celery 系列》的开篇,本系列共四篇,包括:
- 《celery 系列:celery是什么?(1)》
- 《celery 系列:重试机制(2)》
- 《celery 系列:过期时间和时间限制(3)》
- 《celery 系列:flower监控任务(4)》
- 《celery 系列:celery使用示例(5)》
说明:
- celery 版本5.3.6
- 文章的内容基于我在django中应用celery
1、简介
celery是一个由python编写的分布式任务队列系统,用于处理比较耗时异步任务,比如发送邮件、短信等任务;还支持任务的调度,用于定时任务。它具有以下特点:
- 使用简单:Celery 使用和维护都非常简单,并且不需要配置文件。
- 高可用:woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持"双主"或者"主/从"的方式实现高可用。
- 快速:单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)
- 灵活:Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
2、基本概念
-
任务(Task):任务是 Celery 的基本单位,代表要异步执行的工作单元。你可以将任何 Python 函数转化为任务,并将其发送到 Celery 任务队列中执行。
-
任务队列(Task Queue):任务队列是任务存储和传递的地方。 不过celery本身并没有实现任务队列,而是需要通过消息代理的方式。 Celery 支持多种消息代理(如 RabbitMQ、Redis)来实现任务队列。
-
任务执行单元(Worker):工作节点是执行任务的实际工作进程。Celery Worker会监听任务队列,获取并执行任务。
-
消息代理(Broker):消息代理是管理任务队列的系统。常见的消息代理有 RabbitMQ 和 Redis。
-
结果后端(Result Backend):结果后端是存储任务执行结果的地方。Celery 支持多种结果后端(如 Redis、RabbitMQ 、SQLAlchemy)
3、架构
消息代理(Broker)
:消息代理是 Celery 系统中的核心组件之一,负责接收、排队和分发任务消息。客户端将任务消息发送到消息代理,消息代理将这些消息保存到队列中,然后分发给可用的工作者进行处理。- 推荐的消息代理:redis,rabbitmq
任务执行单元(worker)
:worker是 Celery 系统中负责执行任务的组件。它从消息代理中获取任务消息,然后执行这些任务。worker可以运行在多个服务器上,实现任务的并行处理和负载均衡。结果存储(Result Backend)
:结果存储是 Celery 系统中用于存储任务执行结果的组件。用户可以查询任务的执行状态和结果。- 常用的:redis,rabbitmq,sqlalchemy
- 注意:当一个任务 执行完后 ,你会发现在redis中有这样形式的key,
celery-task-meta-bdd49f8b-0fca-4f3c-b847-ed689652eef8
,形式celery-task-meta-uuid
。
4、我所理解的原理
- 在你程序任意地方调用写好的
celery异步任务
,即基本概念
中的任务(Task)
,那么该任务就会加入到基本概念
中的任务队列(Task Queue)
中。 - 那么这个任务队列是什么呢?其实就是
结构
中的消息代理
。celery本身没有实现任务队列,需要借助消息代理实现。那么这个消息代理我们就可以用redis或者是rabbitmq实现。 - celery是需要单独运行的,其实就是启动celery worker,worker会从消息代理中拿到任务然后去执行。
- worker执行任务后会将结果存储到设置的
Result Backend
,以redis为例,此时你会看到一个形式如celery-task-meta-bdd49f8b-0fca-4f3c-b847-ed689652eef8
的key,代表一个任务执行完后的元数据。它的结构是这样的celery-task-meta-{uuid}
5、使用流程
- 配置celery
- 创建celery应用
- 定义异步任务
- 调用异步任务
当然,不要忘了运行celery。
6、celery常用的配置项
CELERY_TASK_SERIALIZER
:任务序列化器,用于指定任务消息的序列化方式。json
:常用,默认这个好了。pickle
yaml
msgpack
CELERY_RESULT_SERIALIZER
:结果序列化器,指定任务执行结果的序列化方式。json
:常用,默认这个好了。pickle
yaml
msgpack
CELERY_ACCEPT_CONTENT
:接受的内容类型列表,用于指定可以接受的序列化类型。该配置可以配置为列表的形式,['pickle', 'json']
json
:常用pickle
yaml
msgpack
CELERY_TIMEZONE
:时区设置,用于指定任务的时区信息。如Asia/Shanghai
CELERY_RESULT_EXPIRE
:指定任务执行结果的过期时间,超过这个时间后,结果将被自动删除或标记为过期,以节省存储空间和资源。它是一个整数,单位秒。CELERY_RESULT_BACKEND
:任务执行结果的存储后端,形式如'redis://127.0.0.1:6379/1'
CELERY_BROKER_URL
:消息代理的 URL,形式如'redis://127.0.0.1:6379/1'
- rabbitmq是这样
'amqp://guest@localhost//'
,amqp协议即代表rabbitmq
- rabbitmq是这样
7、celery 应用(app)
当然,在使用celery之前,需要创建一个 Celery 应用实例(app)。
- 设置环境变量:由于
celery
是独立的项目,所以必须导入django
环境 - 实例化Celery:
app = Celery('celery_tasks')
,接受一个name,这个name在运行的时候需要。 - 配置celery:
app.config_from_object
,读取celery配置 - 加载异步任务:
app.autodiscover_tasks
,这一步是celery发现异步任务。
8、任务(task)
8.1 定义任务
定义任务可以用@app.task 和 @shared_task
装饰器将一个普通函数定义为celery任务。区别在于:
-
@app.task
是直接在 Celery 应用实例上注册任务的一种方式。这意味着任务是直接绑定到特定的 Celery 应用实例的。 -
任务是直接绑定到特定的 Celery 应用实例上。
- 适合简单的项目结构或者单个 Celery 应用的情况。
-
任务的定义和应用实例是紧密耦合的
from celery_task import app # 这里的celery_task指的是celery应用文件
@app.task def add(x, y): return x + y
-
@shared_task
是一种独立于特定 Celery 应用实例的任务注册方式。这使得任务可以在多个 Celery 应用实例中共享。 -
任务定义与特定的 Celery 应用实例解耦。
- 适合更复杂的项目结构,尤其是多个 Celery 应用实例的情况。
- 任务可以在多个 Celery 应用实例之间共享。
-
有助于将任务定义与应用实例配置分离,提高代码的重用性和模块化。
from celery import shared_task # 这里的celery指的是celery库
@shared_task def add(x, y): return x + y
8.2 调用任务
两种方式,用法都是任务名.
:
apply_async
: 是一个功能强大的方法,允许你在调用任务时设置更多参数,例如countdown
,eta
,expires
,retry
,retry_policy
等。delay
: 是apply_async
的快捷方式,简化了任务调用,但不支持直接传递额外参数如过期时间。比如add.delay(1, 2)
返回的都是一个 AsyncResult
对象。
8.3 AsyncResult对象
这个对象包含了有关任务的信息,包括任务的状态、任务的 ID 以及任务的结果。AsyncResult
对象的一些常用属性和方法:
- task_id: 返回任务的唯一 ID
- status:返回任务的当前状态,例如 PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED 等。
- ready():用于检查任务是否已经完成(无论是成功还是失败)。
- 如果任务已经完成,
ready()
返回True
。 - 如果任务还在执行中或者尚未开始,
ready()
返回False
。
- 如果任务已经完成,
- successful():用于检查任务是否成功完成。
- 只有在任务成功完成后,
successful()
才返回True
。 - 如果任务失败或还在执行中,
successful()
返回False
。
- 只有在任务成功完成后,
- result:直接访问任务的结果。
- 如果任务已经完成(无论成功还是失败),
result
属性将返回任务的结果或异常。 - 如果任务还没有完成,访问
result
属性将不会阻塞,它会立即返回。 - 如果任务失败,
result
属性将返回异常信息。
- 如果任务已经完成(无论成功还是失败),
- get(timeout=None):阻塞调用,直到任务完成并返回结果。
- 如果在指定时间内任务没有完成,将抛出
TimeoutError
异常。 - 如果任务成功完成,
get
方法将返回任务结果。 - 如果任务失败,
get
方法将抛出异常,异常类型取决于任务的具体实现。
- 如果在指定时间内任务没有完成,将抛出
- wait(timeout=None):类似于
get
方法,阻塞调用直到任务完成并返回结果。- 如果在指定时间内任务没有完成,将抛出
TimeoutError
异常。 - 与
get
方法不同,wait
方法不会抛出任务执行时的异常,而是返回异常对象。 wait
方法常用于不需要捕获具体异常,只关心任务结果的场景。
- 如果在指定时间内任务没有完成,将抛出
- traceback:返回任务失败时的完整回溯信息(如果有)。
8.4 撤销任务
可以用revoke
方法来终止celery任务,有两种形式,看你具体需求:
- 使用任务的
AsyncResult
对象来调用revoke
方法:这个意思就是当调用delay创建任务后会返回一个AsyncResult
对象,该对象有revoke
方法,直接调用好了,不用传参。 app.control.revoke(task_id, terminate=True)
,第一个参数delay方法返回的AsyncResult
对象的id,即任务id;第二个terminate=True
代表立即停止。
9、celery运行模式
注意,celery
需要单独运行,运行celery其实就是启动worker。 celery提供了多种不同的方式来运行:
celery -A name worker -l info
:默认perfork pool
的方式运行,类似python的multiprocessing.pool
。- 在windows下使用该方式运行celery,celery收到任务但不执行,建议使用
gevent
或eventlet
的方式。 - 原因在于
prefork pool
基于进程的forking
,但windows
支持生成进程,而不支持进程forking
。
- 在windows下使用该方式运行celery,celery收到任务但不执行,建议使用
celery -A name worker -P solo -l info
:Solo 是一个单线程的执行模型,仅用于调试或开发环境,不适用于生产环境。celery -A name worker -P eventlet -l info
:以协程的方式来处理并发任务,需要安装eventlet
库。- 指定数量,再加上
-c 20
,即20个worker
- 指定数量,再加上
celery -A name worker -P gevent -l info
:以协程的方式来处理并发任务,需要安装gevent
库。- 指定数量,再加上
-c 20
,即20个worker
- 指定数量,再加上
celery -A name worker -P threads -l info
:使用线程池来处理并发任务- 指定数量,再加上
-c 20
,即20个worker
- 指定数量,再加上
以celery -A name worker -l info -P eventlet -c 20
运行celery做说明:
-A system
:指定 Celery 应用的名称为name
。这个name实际就是你创建celery app中的nameworker
:命令指示启动一个 Celery Worker。-l info
:设置日志级别为info
,输出详细的日志信息。-P eventlet
:使用eventlet
作为并发库,这是一个异步 I/O 库,用于处理并发任务。-c 20
:指定启动 20 个并发工作者(Worker),即同时处理 20 个任务。
添加日志:再增加- f
celery -A name worker -l info -P eventlet -c 20 -f path
:path就是日志的路径
10、celery 日志
celery
有自己的日志,不能使用其他的。
-
get_task_logger:接受一个name,即日志的名称,然后记录就跟python的logging库一样了。
from celery.utils.log import get_task_logger
logger = get_task_logger('name')
11、在Celery启动时删除以前的任务队列中的所有任务
这个为什么单独拿出来说呢,可能因为某些原因任务积压在队列中,当重新运行celery时,这些任务会从任务队列中拿出来执行,这可能不是我们想要的结果,所以可以在运行celery运行前先将这些积压的任务删除,两种方式:
celery -A name purge
:先执行这条命令,然后再执行运行celery的命令celery -A name purge -f && celery -A name worker -l info
按照我测试的结果,这条命令删除的任务 可能是
在任务队列但是还没有运行,即处于started
状态的任务,如果任务处于started
状态,是不会被删除的。
评论列表,共 0 条评论
暂无评论