您的位置:首页 > 游戏 > 手游 > 网站制作培训中心_西安疫情最新消息今天封城_济南seo的排名优化_市场调研的步骤

网站制作培训中心_西安疫情最新消息今天封城_济南seo的排名优化_市场调研的步骤

2024/12/23 22:12:40 来源:https://blog.csdn.net/itnerd/article/details/144546590  浏览:    关键词:网站制作培训中心_西安疫情最新消息今天封城_济南seo的排名优化_市场调研的步骤
网站制作培训中心_西安疫情最新消息今天封城_济南seo的排名优化_市场调研的步骤

在现代应用开发中,异步任务处理是一个常见的需求。无论是数据处理、图像生成,还是复杂的计算任务,异步执行都能显著提升系统的响应速度和吞吐量。今天,我们将通过一个实际项目,探索如何使用 FastAPICeleryRedis 构建一个高性能的异步任务引擎。

项目背景

技术栈介绍

  • FastAPI:一个现代、高性能的 Web 框架,基于 Python 3.7+ 的异步编程特性构建。它支持自动生成 OpenAPI 文档和 Swagger UI,能够快速构建 RESTful API,并且具有极低的延迟和高并发处理能力。
  • Celery:一个分布式任务队列系统,主要用于处理异步任务和定时任务。它支持多种消息传输机制,能够将任务分发到多个工作节点上并行处理,从而提高系统的吞吐量和响应速度。
  • Redis:一个高性能的键值存储系统,常用于缓存、消息队列和分布式锁等场景。在 Celery 中,Redis 通常作为消息代理(Broker)和结果存储(Backend),负责任务的分发和结果的持久化。

项目目标

通过 FastAPI、Celery 和 Redis 的结合,构建一个能够高效处理用户提交的 Python 代码的异步任务引擎。用户可以通过 API 提交代码,系统会异步执行代码,并返回任务的执行结果。

项目目录结构

project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py

代码功能深度解析

1. main.py:FastAPI 应用的核心

main.py 是项目的核心入口文件,负责定义 FastAPI 应用的接口和逻辑。

FastAPI 应用初始化
app = FastAPI(title="Async Task API", description="", version="1.0.0")

这里我们创建了一个 FastAPI 应用,命名为 Async Task API,版本为 1.0.0

自定义 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js",swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css",)
applications.get_swagger_ui_html = swagger_monkey_patch

通过 Monkey Patch 的方式,我们自定义了 Swagger UI 的资源加载路径,使用了国内的 CDN 加速资源,提升文档加载速度。

全局异常处理
@app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message = f"Failed to execute: {request.method}: {request.url}"return JSONResponse(status_code=400, content={"message": f"{base_error_message}. Detail: {err}"})

我们定义了一个全局异常处理器,捕获所有未处理的异常,并返回一个包含错误信息的 JSON 响应。

HTTP 中间件:计算请求处理时间
@app.middleware("http")
async def add_process_time_header(request, call_next):start_time = time.time()response = await call_next(request)process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(f'{process_time:0.4f} sec')return response

这个中间件用于计算每个请求的处理时间,并将处理时间添加到响应头 X-Process-Time 中,方便调试和性能优化。

创建任务的 API
@app.post('/tasks')
def create_pytask(task: schemas.PyTask):code = task.codetime_limit = task.time_limitexpires = task.expiresresult = execute_python_code.apply_async(args=(code,), time_limit=time_limit, expires=expires)return JSONResponse(content={"task_id": result.id})

用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id} 接口查询。

查询任务结果的 API
@app.get('/tasks/{task_id}', response_model=schemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)

用户可以通过 /tasks/{task_id} 接口查询任务的执行结果和状态。

2. utils.py:任务信息获取工具

utils.py 文件定义了一个工具函数 get_task_info,用于获取 Celery 任务的状态和结果。

def get_task_info(task_id):task_result = AsyncResult(task_id, app=app)result = {"task_id": task_id,"task_status": task_result.status,"task_result": task_result.result}return result

通过 AsyncResult,我们可以获取任务的当前状态(如 PENDINGSUCCESSFAILURE 等)和执行结果。

3. schemas.py:数据模型定义

schemas.py 文件定义了 Pydantic 模型,用于验证和序列化请求和响应的数据。

任务请求模型
class PyTask(BaseModel):code: strexpires: Optional[int] = Nonetime_limit: Optional[int] = None

用户提交的任务请求包含以下字段:

  • code: 任务的 Python 代码。
  • expires: 任务的过期时间(可选)。
  • time_limit: 任务的时间限制(可选)。
任务结果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] = Noneerror: Optional[str] = None

任务的执行结果包含以下字段:

  • status: 任务的执行状态(如 successfailure)。
  • output: 任务的标准输出(可选)。
  • error: 任务的错误输出(可选)。
任务结果响应模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] = None

任务的查询结果包含以下字段:

  • task_id: 任务的 ID。
  • task_status: 任务的状态(如 PENDINGSUCCESS 等)。
  • task_result: 任务的执行结果(可选)。

4. app/__init__.py:Celery 应用初始化

app/__init__.py 文件是 Celery 应用的初始化文件,主要用于配置 Celery 应用和任务的自动发现。

创建 Celery 应用
app = Celery('my_celery_project')

我们创建了一个名为 my_celery_project 的 Celery 应用。

加载配置
app.config_from_object('app.config')

app.config 文件中加载 Celery 的配置。

自动发现任务
app.autodiscover_tasks(['app.tasks'])

自动发现 app.tasks 模块中的任务。

Worker 和 Beat 初始化
@worker_init.connect
def worker_initialization(**kwargs):print("Worker 初始化开始")@beat_init.connect
def beat_initialization(**kwargs):print("Beat 初始化开始")

定义了 Worker 和 Beat 的初始化函数,分别在 Worker 和 Beat 启动时执行。

5. app/config.py:Celery 配置

app/config.py 文件定义了 Celery 的配置。

消息代理和结果存储
broker_url = 'redis://:redisisthebest@redis:6379/0'
result_backend = 'redis://:redisisthebest@redis:6379/0'

使用 Redis 作为消息代理和结果存储。

任务结果过期时间
result_expires = 3600

任务结果在 Redis 中保存 1 小时后过期。

序列化配置
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

使用 JSON 作为任务和结果的序列化格式。

时区配置
timezone = 'Asia/Shanghai'
enable_utc = True

设置时区为 Asia/Shanghai,并启用 UTC 时间。

6. app/tasks/tasks.py:任务执行逻辑

app/tasks/tasks.py 文件定义了一个 Celery 任务 execute_python_code,用于执行用户提交的 Python 代码。

@app.task
def execute_python_code(code_string):temp_file = "temp_code.py"with open(temp_file, "w") as f:f.write(code_string)try:result = subprocess.run(["python3", temp_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)if result.stderr:return {"status": "failure", "error": result.stderr}else:return {"status": "success", "output": result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)

该任务将用户提交的代码字符串保存为临时文件,然后使用 subprocess.run 执行该文件,捕获标准输出和错误输出。如果执行成功,返回 success 状态和标准输出;如果执行失败,返回 failure 状态和错误输出。最后,删除临时文件。

部署分析

version: '3.8'services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency=4 --loglevel=inforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port=5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORD=redisisthebestnetworks:- mynetnetworks:mynet:external: false

在这个 Docker Compose 配置中,我们定义了三个服务:

  • fastapi:FastAPI 应用,负责接收用户请求并分发任务。
  • celery-worker:Celery 工作节点,负责执行异步任务。
  • celery-flower:Celery 的监控工具,提供任务执行的可视化界面。
  • redis:Redis 服务,作为 Celery 的消息代理和结果存储。

代码的功能和价值

功能

  1. 异步任务执行

    • 用户可以通过 /tasks 接口提交 Python 代码,代码会被异步执行。
    • 任务的执行结果可以通过 /tasks/{task_id} 接口查询。
  2. 任务状态管理

    • 任务的状态(如 PENDINGSUCCESSFAILURE 等)可以通过 /tasks/{task_id} 接口查询。
  3. 高性能和可扩展性

    • 使用 FastAPI 和 Celery 构建的异步任务引擎能够处理高并发的任务请求。
    • Celery 的分布式特性使得系统可以轻松扩展以应对更多的任务。
  4. 安全性

    • 通过设置 time_limitexpires,可以限制任务的执行时间和过期时间,防止恶意代码的长时间执行。
  5. 易用性

    • FastAPI 自动生成的 Swagger UI 使得 API 的使用和调试更加方便。
    • Pydantic 模型确保了请求和响应数据的类型安全。

价值

  1. 高效的任务处理

    • 该系统能够高效地处理大量异步任务,适用于需要异步执行代码的场景,如在线代码执行、数据处理、图像处理等。
  2. 可扩展性

    • 通过 Celery 的分布式任务队列,系统可以轻松扩展以处理更多的任务,适合高并发场景。
  3. 安全性

    • 通过限制任务的执行时间和过期时间,系统能够有效防止恶意代码的滥用。
  4. 易用性

    • FastAPI 和 Pydantic 的结合使得 API 的开发和维护更加简单,同时提供了自动生成的文档和类型检查。
  5. 灵活性

    • 系统支持自定义任务的执行逻辑,可以根据业务需求扩展任务类型和功能。

总结

通过 FastAPI、Celery 和 Redis 的结合,我们构建了一个高性能、可扩展的分布式异步任务引擎。它能够高效地处理用户提交的 Python 代码,并提供任务状态查询功能。该系统适用于需要异步执行代码的场景,具有高效、安全、易用和灵活的特点。

无论是构建一个在线代码执行平台,还是处理复杂的计算任务,这个项目都为你提供了一个强大的基础。希望这篇文章能为你带来启发,让你在异步任务处理的道路上走得更远!

附图

发送任务
在这里插入图片描述
查询结果
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com