昨天写了 【最后203篇系列】025 FastAPI+Celery,大致把我对celery的研究历史梳理了一遍,然后按照最新的方法重新包了一次,明确了其定位,使用方法。
定位:celery用于解决数据IO和流转问题。
使用方法:
- 1 对于外部使用者,不想与之产生过多的交互和规范对接,可以用伪实时接口(接口用异步事件轮询);
- 2 对于内部使用者,可以采用webhook的方式,处理完成时,会通知到webhook服务,然后通过这个服务进行对应的分发。
部署方案:还是采用单点redis服务作为队列,然后在不同的服务器上启动celery worker。这样通过在多台机器,用docker方式进行部署,可以很快的弹性增加其能力。
架构技术面:
- 1 功能基于微服务。例如,一个微服务用于从网上实时获取数据,这个功能本身被封装为了微服务。所以celery可以触发这个微服务工作;当然,如果被获取的网站允许直接访问的话,那么celery也可以直接去执行。其实不管是celery还是微服务,本身都是用异步方式构建的,问题不大。
- 2 基于kafka缓冲和分发。如何确保多个celery worker不去干重复的事?背后的任务通过kafka缓存,每个worker通过一个单线程的分发器来获取数据(kafka的速度很快,单线程也可以轻松在每秒上万的速度)。
- 3 全局锁方法。这次暂时不考虑,以后可以为每个任务设置一个锁,worker启动时递增这个锁计数,worker执行完毕时减少锁计数,从而确保每个任务不会因为某些原因挂起太多worker。
- 4 硬盘KV缓存。每个worker启动时,应该有自己的唯一编号作为Key,然后采用RocksDB作为硬盘缓存。这样,worker虽然被设计为无状态,但是通过硬盘KV缓存仍然可以是有状态的。这些状态允许worker有更聪明的做事方法,在普通的固态硬盘下,存取一条数据大概在5ms左右(我对数据都做了json封装,会慢一点)。也就是说,每秒可以允许200个worker存取状态。
- 5 定时任务。一种方法是通过FastAPI+APScheduler来进行长期的运行管理,此时就需要通过某个界面来看到定义的任务,运行的任务及状态,还有就是通过按钮进行交互式控制。另一种方式比较简单,通过kuma这样的工具,每30秒唤起一次worker,worker通过读取自己的元数据判断是否需要工作。(按照时间或者事件)
业务体验:
- 1 以30秒的节奏,获取数百个外界数据,发现CPU和内存的开销非常小。
- 2 打开kuma或者自定义的前端,可以很快看见所执行的任务。在异常的时候可以告警。
- 3 容量更大了。因为celery worker分布在不同机器上,那么收到同一ip限速的情况会得到很大改善。(虽然我的请求频次和数据都不大)
本篇的着重点在于构建FastAPI和celery的连接,这样就可以通过api方式发布、查询和控制任务
的运行了。最后,还应当给到一个ORM(就叫FastCelery),使得任务的操作非常简便。然后这个话题应该就告一段落,这个周末也算搞了点有用的东西。
需要两个文件,一个文件构造FastAPI和APScheduler,另一个文件构造服务及调用。
server.py
用于驱动提交celery任务和查询任务状态。
from fastapi import FastAPI
from celery.result import AsyncResult
from pydantic import BaseModel
from typing import Optional# 建议将导入的模块组织得更清晰
from celery_app import celery_app # 假设这是你的Celery实例app = FastAPI()# 模型定义
class TaskData(BaseModel):task_name: strtask_params: Optional[dict] = Noneclass TaskID(BaseModel):task_id: str# 路由
@app.post("/run_task/")
async def run_task(taskdata: TaskData):"""提交任务到Celery"""kwargs = taskdata.task_params if taskdata.task_params is not None else {}result = celery_app.send_task(taskdata.task_name, kwargs=kwargs)return {"task_id": result.id}@app.post("/get_task/")
async def get_task_status(taskid: TaskID):"""获取任务状态"""task_result = AsyncResult(taskid.task_id, app=celery_app)return {"task_id": taskid.task_id,"status": task_result.status,"result": task_result.result,}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
调试
import httpx # 发布任务
response = httpx.post("http://127.0.0.1:8000/run_task/",json={ 'task_name': 'celery_app.process_task',"task_params": {'test1': "test222 data"}}
)
print(response.json()) # 输出任务 IDtask_json = response.json()
# 发布任务
response = httpx.post("http://127.0.0.1:8000/get_task/",json=task_json
)
print(response.json()) # 输出结果一开始没好,是pending状态
{'task_id': 'b3cba149-2e81-4491-b436-fec88bf340bf'}
{'task_id': 'b3cba149-2e81-4491-b436-fec88bf340bf', 'status': 'PENDING', 'result': None}然后就是success
{'task_id': 'b3cba149-2e81-4491-b436-fec88bf340bf', 'status': 'SUCCESS', 'result': 'Processed: test222 data'}
测试之前的微服务调用任务,也是没有问题的
import httpx webhook_url="https://your-webhook.example.com/notify"
test_url ="https://httpbin.org/post"
task ={"url": test_url,"json_data": {"key": "value"},"timeout": 5.0,'webhook_url':webhook_url
}# 发布任务
response = httpx.post("http://127.0.0.1:8000/run_task/",json={ 'task_name': 'celery_app.http_request',"task_params":task}
)
print(response.json()) # 输出任务 IDtask_json = response.json()response = httpx.post("http://127.0.0.1:8000/get_task/",json=task_json
)
print(response.json()) # 输出结果
客户端
{'task_id': 'e7e9313c-9cfa-49c1-8feb-a0a587536552', 'status': 'SUCCESS', 'result': {'args': {}, 'data': '{"key": "value"}', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Length': '16', 'Content-Type': 'application/json', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.27.2', 'X-Amzn-Trace-Id': 'Root=1-67e95111-696561ec4aff8e407d09bb34'}, 'json': {'key': 'value'}, 'origin': '45.126.120.54', 'url': 'https://httpbin.org/post'}}celery端
[2025-03-30 22:11:31,129: WARNING/MainProcess] Webhook回调失败: [Errno -2] Name or service not known
[2025-03-30 22:11:31,214: INFO/MainProcess] Task celery_app.http_request[e7e9313c-9cfa-49c1-8feb-a0a587536552] succeeded in 2.510464843013324s: {'args': {}, 'data': '{"key": "value"}', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Length': '16', 'Content-Type': 'application/json', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.27.2', 'X-Amzn-Trace-Id': 'Root=1-67e95111-696561ec4aff8e407d09bb34'}, 'json': {'key': 'value'}, 'origin': '45.126.120.54', 'url': 'https://httpbin.org/post'}服务端
INFO: 127.0.0.1:47164 - "POST /run_task/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:47174 - "POST /get_task/ HTTP/1.1" 200 OK
到这里,其实通过kuma已经可以完成任务的定时调用了。再接下来就是用FastAPI+APScheduler实现定时任务,从而方便地将一些任务进行长期的运行。
想了一下,我觉得最重要的还是不要烂尾,所以先进行使用封装,定时任务搞了好几遍了,先不花时间
- Step1: 将现有的版本发布为微服务
- Step2: 使用对象封装
部署:worker是不需要端口的,配置指定了中间队列(redis); server需要指定一个端口,用来向worker发送指令。使用相同的配置文件,是因为在服务向celery_app传递信息时,需要使用一致的消息队列(redis)。
# Celery Worker(Pool)
docker run --name=worker_celery_24165 \-d --restart=always \-v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e "LANG=C.UTF-8" \-v /opt/container_cfgs/celery_24165/base_config.py:/workspace/base_config.py \YOURS\sh -c "celery -A celery_app.celery_app worker --loglevel=info --pool=gevent --concurrency=100" # A Server
docker run --name=server_celery_34165 \-d --restart=always \-p YOURS:8000 \-v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e "LANG=C.UTF-8" \-v /opt/container_cfgs/celery_24165/base_config.py:/workspace/base_config.py \YOURS \sh -c "uvicorn server:app --host 0.0.0.0 --port 8000 --workers 3"
测试
import httpx # server_url = 'http://127.0.0.1:8000'
server_url = 'http://YOURS'# 发布任务
response = httpx.post(f"{server_url}/run_task/",json={ 'task_name': 'celery_app.process_task',"task_params": {'test1': "test222 data"}}
)
print(response.json()) # 输出任务 IDtask_json = response.json()
# 发布任务
response = httpx.post(f"{server_url}/get_task/",json=task_json
)
print(response.json()) # 输出结果{'task_id': '17dc615d-2564-42cb-82fb-5b8ed2d76c15'}
{'task_id': '17dc615d-2564-42cb-82fb-5b8ed2d76c15', 'status': 'PENDING', 'result': None}
seconds later ..
{'task_id': '17dc615d-2564-42cb-82fb-5b8ed2d76c15', 'status': 'SUCCESS', 'result': 'Processed: test222 data'}
WCelery.py
最后则是用一个对象把这个操作串联起来,简化
import httpx
from pydantic import BaseModelclass HttpTaskParams(BaseModel):url: str json_data : dict timeout : float =5.0webhook_url: str = None class WCelery:def __init__(self, celery_agent_url = 'YOURS/',webhook_url = None):self.celery_agent_url = celery_agent_urlself.webhook_url = webhook_urlself.task_id = None def send_a_http_task(self, task_params = None):task_params1 = HttpTaskParams(**task_params)# 允许使用if self.webhook_url:task_params1.webhook_url = self.webhook_urlresp = httpx.post(self.celery_agent_url + 'run_task/', json ={'task_name': 'celery_app.http_request',"task_params":task_params1.dict(exclude_none=True)}).json()self.task_id = resp['task_id']return resp def get_a_task(self,task_id = None):the_task_id = task_id or self.task_idassert the_task_id != None,'任务ID不能为空'return httpx.post(self.celery_agent_url + 'get_task/', json ={'task_id':the_task_id }).json()if __name__ == '__main__':webhook_url="https://your-webhook.example.com/notify"test_url ="https://httpbin.org/post"task ={"url": test_url,"json_data": {"key": "value"},"timeout": 5.0,'webhook_url':webhook_url}wcelery = WCelery()wcelery.send_a_http_task(task_params=task)wcelery.get_a_task()resp = httpx.post(self.celery_agent_url + 'run_task/', json ={'task_name': 'celery_app.http_request',"task_params":task_params1.dict(exclude_none=True)}).json()
Out[4]: {'task_id': 'f74b9040-cabc-4efb-a330-9930aa51cba9'}In [5]: wcelery.get_a_task()
Out[5]:
{'task_id': 'f74b9040-cabc-4efb-a330-9930aa51cba9','status': 'SUCCESS','result': {'args': {},'data': '{"key": "value"}','files': {},'form': {},'headers': {'Accept': '*/*','Accept-Encoding': 'gzip, deflate, br, zstd','Content-Length': '16','Content-Type': 'application/json','Host': 'httpbin.org','User-Agent': 'python-httpx/0.27.2','X-Amzn-Trace-Id': 'Root=1-67ec002a-42e1cb4812f9ad0c206e4c40'},'json': {'key': 'value'},'origin': 'xxx.xxx.xxx.xxx','url': 'https://httpbin.org/post'}}
当然,因为webhook是我胡编的,后台应该有一个webhook失败的日志,但可以不理会。
一个实际的应用场景是:数据流构建
- 1 我构造一个特定的微服务
- 2 用WCelery发送服务测试,看任务是否会生效
- 3 如果生效,那么对应数据应该已经通过webhook服务被分发到kafka了(数据需要遵守一定规范)
这样就把之前解耦的工程又灵活的连起来了。