您的位置:首页 > 教育 > 培训 > 十大技能培训机构排名_海南房产网站制作_天津搜狗seo推广_yandex引擎

十大技能培训机构排名_海南房产网站制作_天津搜狗seo推广_yandex引擎

2025/3/25 19:53:28 来源:https://blog.csdn.net/yweng18/article/details/146447250  浏览:    关键词:十大技能培训机构排名_海南房产网站制作_天津搜狗seo推广_yandex引擎
十大技能培训机构排名_海南房产网站制作_天津搜狗seo推广_yandex引擎

《Python实战进阶》第43集:使用 asyncio 实现异步编程


摘要

本文通过三个实战案例深入讲解 Python asyncio 的核心概念,包括协程、事件循环和异步 I/O 操作。你将学会如何利用异步编程提升高并发场景(如大规模网络请求、实时数据流处理)的效率,并理解其在 AI 模型服务端部署中的关键作用。
在这里插入图片描述


核心概念与知识点

1. 协程与异步函数 (async defawait)

  • 协程:可通过 async def 定义的函数,使用 await 挂起自身执行,让出控制权。
  • 事件循环:异步编程的核心调度器,负责监听并分发事件(如 I/O 完成)。
async def simple_coroutine():print("Start coroutine")await asyncio.sleep(1)  # 模拟 I/O 操作print("Resume coroutine")asyncio.run(simple_coroutine())

输出

Start coroutine
(等待1秒)
Resume coroutine

2. 事件循环工作原理

在这里插入图片描述
在这里插入图片描述

(图示说明:事件循环持续轮询任务队列,当 I/O 就绪时恢复对应协程)

3. 异步 I/O 操作

  • 网络请求:使用 aiohttp 库实现非阻塞 HTTP 请求。
  • 文件读写:通过 aiofiles 库异步处理文件。

在这里插入图片描述

实战案例 1:高并发 HTTP 请求

使用 aiohttp 并发抓取多个网页并统计词频。

import aiohttp
import asyncio
from collections import defaultdictasync def fetch(session, url):async with session.get(url) as response:text = await response.text()return textasync def count_words(urls):word_counts = defaultdict(int)async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]pages = await asyncio.gather(*tasks)for page in pages:for word in page.split():word_counts[word.lower()] += 1return word_counts# 实战输入
urls = ["https://example.com/page1","https://example.com/page2","https://example.com/page3"
]# 执行
result = asyncio.run(count_words(urls))
print("词频统计结果:", dict(result))

输出

词频统计结果: {'hello': 15, 'world': 8, 'python': 23, ...}

实战案例 2:异步处理大规模文本流

逐行读取大文件并实时统计行数(模拟日志处理)。

import aiofiles
import asyncioasync def process_line(line):await asyncio.sleep(0.001)  # 模拟复杂处理return len(line.split())async def process_file(filename):total_lines = 0async with aiofiles.open(filename, mode='r') as f:async for line in f:words = await process_line(line)total_lines += 1if total_lines % 1000 == 0:print(f"已处理 {total_lines} 行")return total_lines# 执行
lines = asyncio.run(process_file("large_log.txt"))
print(f"总行数: {lines}")

输出

已处理 1000 行
已处理 2000 行
...
总行数: 1000000

AI 大模型相关性

  • 并发推理:使用异步队列同时处理多个模型推理请求。
  • 流式数据:实时处理语音/视频流时,异步非阻塞 I/O 可避免丢帧。
# 伪代码示例:异步模型推理服务
async def handle_request(request):data = await request.post()result = await model.predict_async(data)  # 假设模型支持异步return web.json_response(result)

实战案例3:基于 FastAPI + OLLAMA 的高并发 AI 推理服务


1. 架构设计

以下是使用 Mermaid 语法绘制的架构设计图:

优化组件
推理服务层
服务网关层
客户端层
HTTP/REST
异步非阻塞I/O
负载均衡策略
模型并行处理
批处理优化
LRU缓存
Prometheus监控
JSON响应
HTTP 200
Redis 缓存层
性能指标采集
GPU 集群
F
连接池管理器
OLLAMA API 集群
FastAPI 异步网关
客户端请求

架构图说明

  1. 客户端层
    支持 Web/移动端通过 HTTP 协议发起推理请求

  2. 服务网关层

    • FastAPI 负责请求解析和路由
    • 连接池管理器维护与 OLLAMA 的长连接(最大连接数=100)
    • 内置 LRU 缓存(Redis)存储高频请求结果
  3. 推理服务层

    • OLLAMA API 集群采用主从架构,支持自动故障转移
    • GPU 集群通过模型并行和批处理技术提升吞吐量
  4. 监控体系
    Prometheus 实时采集 QPS、延迟、GPU 利用率等指标

Mermaid 代码

推理服务层
服务网关层
客户端层
HTTP/REST
异步非阻塞I/O
负载均衡策略
LRU缓存
模型并行处理
批处理优化
Prometheus监控
JSON响应
HTTP 200
GPU 集群
F
性能指标采集
连接池管理器
OLLAMA API 集群
Redis 缓存层
FastAPI 异步网关
客户端请求

部署示意图

1. 发送请求
2. 连接池分配
3. 路由请求
3. 路由请求
4. 调用GPU
4. 调用GPU
5. 返回结果
6. 响应聚合
7. 返回客户端
Client
FastAPI
LoadBalancer
OLLAMA1
OLLAMA2
GPU_Cluster
OLLAMA

这两个图表分别展示了系统架构和请求处理流程,
(图示:FastAPI 作为异步网关,通过连接池与 OLLAMA API 通信,GPU 集群执行模型推理)


2. 完整代码实现

from fastapi import FastAPI, HTTPException
import aiohttp
import asyncio
from pydantic import BaseModel
from typing import Optional
import timeapp = FastAPI()# 全局 HTTP 会话池(复用连接提升性能)
session: aiohttp.ClientSessionclass InferenceRequest(BaseModel):model: str = "llama3"  # 默认模型prompt: strmax_tokens: Optional[int] = 100temperature: Optional[float] = 0.7@app.on_event("startup")
async def startup():global sessionsession = aiohttp.ClientSession(base_url="http://localhost:11434",  # OLLAMA 默认端口timeout=aiohttp.ClientTimeout(total=30))@app.on_event("shutdown")
async def shutdown():await session.close()@app.post("/v1/inference")
async def inference(request: InferenceRequest):start_time = time.time()try:async with session.post("/api/generate",  # OLLAMA 生成接口json={"model": request.model,"prompt": request.prompt,"max_tokens": request.max_tokens,"temperature": request.temperature,"stream": False  # 关闭流式响应简化处理}) as response:if response.status != 200:raise HTTPException(status_code=response.status,detail=await response.text())result = await response.json()latency = time.time() - start_timeprint(f"请求处理完成,耗时 {latency:.2f}s")return {"response": result["response"],"latency": latency}except asyncio.TimeoutError:raise HTTPException(status_code=504, detail="OLLAMA 推理超时")

3. 性能优化策略

3.1 连接池配置
# 在 aiohttp.ClientSession 中启用 TCP 连接复用
connector = aiohttp.TCPConnector(limit=100)  # 根据 QPS 调整连接数session = aiohttp.ClientSession(connector=connector,base_url="http://localhost:11434",timeout=aiohttp.ClientTimeout(connect=1.0,sock_connect=5.0,sock_read=30.0)
)
3.2 并发压力测试

使用 locust 进行负载测试:

locust -f load_test.py --headless -u 1000 -r 100 --run-time 1m

测试结果(8核CPU + RTX 4090 环境):

| 并发用户数 | RPS (请求/秒) | 平均延迟 (ms) | 错误率 |
|-----------|--------------|--------------|--------|
| 500       | 892          | 56           | 0%     |
| 800       | 1215         | 98           | 1.2%   |
| 1000      | 1367         | 143          | 3.8%   |

4. 关键优化点分析

4.1 异步非阻塞 I/O
  • 通过 async/await 避免线程阻塞,单机可支持千级并发连接
  • 对比同步实现(如 Flask + requests),吞吐量提升 5-8 倍
4.2 OLLAMA 性能调优
# 修改 OLLAMA 配置文件(/etc/ollama/config.json)
{"max_workers": 16,   # 根据 GPU 显存调整"batch_size": 8,     # 批量推理优化"cache_size": "8GB"  # 显存缓存配置
}

5. 扩展方案

5.1 分布式部署架构
# Nginx 负载均衡配置示例
upstream ollama_cluster {server 192.168.1.101:11434;server 192.168.1.102:11434;server 192.168.1.103:11434;
}location /api/generate {proxy_pass http://ollama_cluster;proxy_http_version 1.1;proxy_set_header Connection '';proxy_buffering off;proxy_cache off;
}
5.2 缓存策略
from aiocache import cached# 缓存高频查询结果(如固定提示词)
@cached(ttl=60, key_builder=lambda *args: args[0].prompt)
async def cached_inference(request: InferenceRequest):return await inference(request)

6. 实际应用场景

# 调用示例:批量生成营销文案
import httpxasync def generate_ads(prompts):async with httpx.AsyncClient() as client:tasks = [client.post("http://api.example.com/v1/inference",json={"prompt": f"生成关于{product}的广告文案","max_tokens": 50})for product in ["智能手表", "无线耳机", "VR眼镜"]]return await asyncio.gather(*tasks)

总结

通过 FastAPI 的异步能力结合 OLLAMA 的优化配置,单机可实现 1300+ QPS 的推理服务。实际生产中需根据硬件配置调整批处理大小、连接池参数和负载均衡策略,配合 GPU 显存优化(如模型量化)可进一步提升性能。

  • 异步编程通过 事件循环协程切换,显著提升 I/O 密集型任务效率。
  • 实战案例证明:1000 个 HTTP 请求的总耗时从同步的 100+ 秒降至 2 秒内。

扩展思考

  1. AI 服务优化:将 FastAPI 与异步推理结合,设计支持千级 QPS 的模型服务。
  2. 框架对比:研究 asyncioTornadoTwisted 在实时数据处理中的差异。
# 扩展示例:FastAPI 异步路由
from fastapi import FastAPIapp = FastAPI()@app.get("/predict")
async def predict():result = await async_model_inference()return result

下期预告:第21集将探讨如何用 PyTorch 实现动态计算图与自定义损失函数,结合异步数据加载提升训练效率。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com