Celery 是一个基于分布式消息传递的异步任务队列系统,主要用于处理耗时任务、定时任务和周期性任务。它能够将任务分配到多个工作节点(Worker)上执行,从而提高应用程序的性能和可扩展性。Celery 是 Python 生态中最流行的任务队列工具之一,广泛应用于 Web 开发、数据处理、机器学习等领域。
核心概念
1. 任务(Task):
任务是 Celery 的基本执行单元,通常是一个 Python 函数。
任务可以是同步的,也可以是异步的。异步任务会被放入任务队列中,由 Worker 执行。
2. 消息队列(Broker):
Celery 使用消息队列来传递任务。常见的消息队列包括:
RabbitMQ(推荐)
Redis
Amazon SQS
任务会被发送到消息队列中,等待 Worker 处理。
3. Worker:
Worker 是执行任务的进程。它会从消息队列中获取任务并执行。
可以启动多个 Worker 来提高并发处理能力。
4. 后端(Backend):
后端用于存储任务执行的结果。常见后端包括:
Redis
RabbitMQ
数据库(如 PostgreSQL、MySQL)
任务执行完成后,结果会被存储在后端中,供后续查询。
5. 定时任务(Periodic Tasks):
Celery 支持定时任务和周期性任务,通常与 Celery Beat 结合使用。
可以通过配置文件或代码定义任务的执行时间。
主要特点
1. 异步执行:
将耗时任务(如发送邮件、处理文件、调用外部 API)放入任务队列中异步执行,避免阻塞主程序。
2. 分布式:
支持多台机器上的 Worker 协同工作,适合高并发场景。
3. 可扩展:
通过增加 Worker 的数量,可以轻松扩展任务处理能力。
4. 定时任务:
支持定时任务和周期性任务,适合定时数据同步、报表生成等场景。
5. 任务重试:
支持任务失败后的自动重试机制,提高任务的可靠性。
6. 结果存储:
任务执行结果可以存储在后端中,方便查询和后续处理。
典型应用场景
1. Web 开发:
处理耗时请求,如发送邮件、生成 PDF、处理上传文件等。
例如,用户注册后异步发送欢迎邮件。
2. 数据处理:
处理大量数据,如数据清洗、数据分析、机器学习模型训练等。
3. 定时任务:
定时执行任务,如每天凌晨生成报表、定时备份数据等。
4. 分布式计算:
将任务分发到多台机器上执行,适合大规模计算任务。
基本使用示例以下是一个简单的 Celery 使用示例:1. 安装 Celery
bash
pip install celery2. 创建 Celery 应用
pythontasks.py
from celery import Celery创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')定义任务
@app.task
def add(x, y):return x + y3. 启动 Worker
bash
celery A tasks worker loglevel=info4. 调用任务
python调用异步任务
result = add.delay(4, 6)获取任务结果
print(result.get())Celery 与 Flask 集成在 Flask 中,可以使用 FlaskCeleryExt 或直接集成 Celery。例如:python
from flask import Flask
from celery import Celerydef create_app():app = Flask(__name__)app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])celery.conf.update(app.config)app.extensions["celery"] = celeryreturn appapp = create_app()
celery = app.extensions["celery"]@celery.task
def my_task():return "Task executed!"
Celery 是一个强大的异步任务队列工具,适用于处理耗时任务、定时任务和分布式计算。它的核心组件包括任务、消息队列、Worker 和后端。通过与 Flask 等 Web 框架集成,可以轻松实现异步任务处理,提升应用程序的性能和用户体验。