一、Celery核心机制解析
1.1 分布式架构四要素
BROKER_URL = 'redis://:password@localhost:6379/0'
RESULT_BACKEND = 'redis://:password@localhost:6379/1'
TASK_SERIALIZER = 'json'
ACCEPT_CONTENT = [ 'json' ]
TIMEZONE = 'Asia/Shanghai'
核心组件对比:
组件 作用 常用实现 Broker 任务消息传递 RabbitMQ/Redis Worker 任务执行节点 Celery Worker Backend 结果存储 Redis/PostgreSQL Monitor 任务监控 Flower/Prometheus
1.2 第一个分布式任务
from celery import Celeryapp = Celery( 'demo' , broker= 'redis://localhost:6379/0' ) @app. task
def send_email ( to, content) : import timetime. sleep( 3 ) return f"Email to { to} sent: { content[ : 20] } ..."
快速验证:
celery -A tasks worker --loglevel= info
from tasks import send_email
result = send_email.delay( 'user@example.com' , 'Your order #1234 has shipped!' )
print( result.get( timeout= 10 ))
二、Celery高级应用技巧
2.1 复杂工作流设计
@app. task
def validate_order ( order_id) : return { 'order_id' : order_id, 'status' : 'valid' } @app. task
def process_payment ( order_info) : return { ** order_info, 'paid' : True } @app. task
def ship_order ( payment_result) : return { ** payment_result, 'tracking_no' : 'EXPRESS123' }
from celery import chain
order_chain = chain( validate_order. s( 1001 ) , process_payment. s( ) , ship_order. s( )
) . apply_async( )
2.2 任务监控与报警
@app. task ( bind= True , max_retries= 3 )
def risky_operation ( self) : try : 1 / 0 except Exception as exc: self. retry( exc= exc, countdown= 2 ** self. request. retries)
from celery. signals import task_failure@task_failure. connect
def alert_on_failure ( sender= None , task_id= None , ** kwargs) : import requestsrequests. post( 'https://报警接口地址' , json= { 'task' : sender. name, 'error' : str ( kwargs. get( 'exception' ) ) } )
三、构建分布式监控系统
3.1 系统架构设计
+----------------+| Flask API |+-------+--------+| 触发监控任务v
+-------------+ +--------+--------+
| Redis <-------+ Celery Beat |
+------+------+ +--------+--------+^ || 存储任务 | 分发任务v v
+------+------+ +--------+--------+
| Worker1 | | Worker2 |
| (HTTP监测) | | (磁盘检查) |
+-------------+ +-----------------+
3.2 核心监控任务实现
@app. task
def check_http_endpoint ( url) : import requestsstart = time. time( ) try : resp = requests. get( url, timeout= 10 ) return { 'url' : url, 'status' : 'UP' if resp. ok else 'DOWN' , 'response_time' : time. time( ) - start} except Exception as e: return { 'url' : url, 'error' : str ( e) } @app. task
def check_disk_usage ( host) : import paramikoclient = paramiko. SSHClient( ) client. set_missing_host_key_policy( paramiko. AutoAddPolicy( ) ) client. connect( host, username= 'monitor' , key_filename= '~/.ssh/monitor_key' ) stdin, stdout, stderr = client. exec_command( 'df -h /' ) output = stdout. read( ) . decode( ) client. close( ) return parse_disk_output( output)
from celery. schedules import crontabapp. conf. beat_schedule = { 'check-homepage-every-5m' : { 'task' : 'monitor_tasks.check_http_endpoint' , 'schedule' : crontab( minute= '*/5' ) , 'args' : ( 'https://www.yourdomain.com' , ) } , 'daily-disk-check' : { 'task' : 'monitor_tasks.check_disk_usage' , 'schedule' : crontab( hour= 3 , minute= 0 ) , 'args' : ( 'server01' , ) }
}
四、实战:可视化监控面板
4.1 使用Flower实时监控
celery -A monitor_tasks flower --port= 5555
访问http://localhost:5555
可以看到:
实时任务执行状态 Worker节点负载情况 任务历史统计图表
4.2 Prometheus集成方案
from prometheus_client import start_http_server, CounterTASKS_STARTED = Counter( 'celery_tasks_started' , 'Total tasks started' )
TASKS_FAILED = Counter( 'celery_tasks_failed' , 'Total tasks failed' ) @task_prerun. connect
def count_task_start ( sender= None , ** kwargs) : TASKS_STARTED. inc( ) @task_failure. connect
def count_task_failure ( sender= None , ** kwargs) : TASKS_FAILED. inc( )
start_http_server( 8000 )
五、生产环境最佳实践
5.1 部署架构优化
[ program:celery_worker]
command = celery -A proj worker --loglevel= info --concurrency= 4
directory = /opt/yourproject
autostart = true
autorestart = true[ program:celery_beat]
command = celery -A proj beat
directory = /opt/yourproject
autostart = true
5.2 安全加固措施
app. conf. result_backend_transport_options = { 'visibility_timeout' : 3600 , 'signed_data' : True
}
app. conf. task_routes = { 'critical_tasks.*' : { 'queue' : 'secure' } , '*.default' : { 'queue' : 'regular' }
}
六、知识体系进阶
6.1 扩展学习路径
消息队列深度:RabbitMQ vs Kafka 容器化部署:Docker + Kubernetes 分布式追踪:OpenTelemetry 自动扩缩容:Celery Autoscale
6.2 推荐工具链
工具类型 推荐方案 消息队列 RabbitMQ 监控系统 Prometheus + Grafana 任务可视化 Flower 部署管理 Supervisor/Docker