目录
- 代码
- 代码解释
- 1. 基础设置
- 2. 客户端初始化
- 3. 数据模型定义
- 4. 缓存设置
- 5. 缓存装饰器
- 6. 示例函数
- 工作流程
- 示例
- 类似例子
代码
import functools
import inspect
import instructor
import diskcachefrom openai import OpenAI, AsyncOpenAI
from pydantic import BaseModelclient = instructor.from_openai(OpenAI(api_key = "your api key",base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"))aclient = instructor.from_openai(AsyncOpenAI(api_key = "your api key",base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"))model_name = "qwen-turbo"class UserDetail(BaseModel):name: strage: intcache = diskcache.Cache("./my_cache_directory")def instructor_cache(func):"""Cache a function that returns a Pydantic model"""return_type = inspect.signature(func).return_annotationif not issubclass(return_type, BaseModel):raise ValueError("The return type must be a Pydantic model")is_async = inspect.iscoroutinefunction(func)@functools.wraps(func)def wrapper(*args, **kwargs):key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"# Check if the result is already cachedif (cached := cache.get(key)) is not None:# Deserialize from JSON based on the return typeif issubclass(return_type, BaseModel):return return_type.model_validate_json(cached)# Call the function and cache its resultresult = func(*args, **kwargs)serialized_result = result.model_dump_json()cache.set(key, serialized_result)return result@functools.wraps(func)async def awrapper(*args, **kwargs):key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"# Check if the result is already cachedif (cached := cache.get(key)) is not None:# Deserialize from JSON based on the return typeif issubclass(return_type, BaseModel):return return_type.model_validate_json(cached)# Call the function and cache its resultresult = await func(*args, **kwargs)serialized_result = result.model_dump_json()cache.set(key, serialized_result)return resultreturn wrapper if not is_async else awrapper@instructor_cache
def extract(data) -> UserDetail:return client.chat.completions.create(model=model_name,response_model=UserDetail,messages=[{"role": "user", "content": data},],) # type: ignore@instructor_cache
async def aextract(data) -> UserDetail:return await aclient.chat.completions.create(model=model_name,response_model=UserDetail,messages=[{"role": "user", "content": data},],) # type: ignore
代码解释
1. 基础设置
import functools
import inspect
import instructor
import diskcache
from openai import OpenAI, AsyncOpenAI
from pydantic import BaseModel
这部分导入了必要的库:
diskcache
: 用于磁盘缓存instructor
: 用于增强 OpenAI API 的功能pydantic
: 用于数据验证和序列化
2. 客户端初始化
client = instructor.from_openai(OpenAI(...))
aclient = instructor.from_openai(AsyncOpenAI(...))
创建了两个客户端:
- 同步客户端
client
- 异步客户端
aclient
3. 数据模型定义
class UserDetail(BaseModel):name: strage: int
定义了用户详情的数据模型,包含名字和年龄字段。
4. 缓存设置
cache = diskcache.Cache("./my_cache_directory")
创建了一个磁盘缓存实例,用于存储函数调用结果。
5. 缓存装饰器
def instructor_cache(func):
这是核心的缓存装饰器,主要功能:
- 检查返回类型是否为 Pydantic 模型
- 根据函数是否为异步选择不同的包装器
- 实现缓存逻辑
缓存逻辑包括:
- 生成缓存键
- 检查缓存是否存在
- 如果存在则返回缓存结果
- 如果不存在则执行函数并缓存结果
6. 示例函数
@instructor_cache
def extract(data) -> UserDetail:return client.chat.completions.create(...)@instructor_cache
async def aextract(data) -> UserDetail:return await aclient.chat.completions.create(...)
两个示例函数:
extract
: 同步函数aextract
: 异步函数
都使用了缓存装饰器,用于从文本中提取用户信息。
工作流程
- 当调用这些函数时,装饰器首先检查缓存
- 如果找到缓存,直接返回缓存的结果
- 如果没有缓存,调用 OpenAI API 获取结果
- 将结果缓存到磁盘并返回
这种缓存机制可以:
- 减少 API 调用次数
- 降低成本
- 提高响应速度
- 减少重复计算
示例
# 同步测试
import timestart = time.perf_counter()
model = extract("John introduced himself as a 29-year-old developer")
print(f"Time taken: {time.perf_counter() - start}")
print(f"model:{model}")
print("-" * 80)start = time.perf_counter()
model = extract("John introduced himself as a 29-year-old developer")
print(f"Time taken: {time.perf_counter() - start}")
print(f"model_cache:{model}")
print("-" * 80)
Time taken: 1.371705400000792
model:name='John' age=29
--------------------------------------------------------------------------------
Time taken: 0.00026250000519212335
model_cache:name='John' age=29
--------------------------------------------------------------------------------
async def atest_extract():start = time.perf_counter()model = await aextract("John introduced himself as a 29-year-old developer")print(f"Time taken: {time.perf_counter() - start}")print(f"model:{model}")print("-" * 80)start = time.perf_counter()model = await aextract("John introduced himself as a 29-year-old developer")print(f"Time taken: {time.perf_counter() - start}")print(f"model_cache:{model}")print("-" * 80)await atest_extract()
Time taken: 1.3256608999945456
model:name='John' age=29
--------------------------------------------------------------------------------
Time taken: 4.610000178217888e-05
model_cache:name='John' age=29
--------------------------------------------------------------------------------
类似例子
from pydantic import BaseModel, Field
import instructor
from openai import OpenAIclass ArticleSummary(BaseModel):title: strsummary: strkey_points: list[str]word_count: int@instructor_cache
def extract_summary(article: str) -> ArticleSummary:return client.chat.completions.create(model="qwen-turbo",response_model=ArticleSummary,messages=[{"role": "user", "content": f"Summarize this article: {article}"}])article = """
人工智能正在改变我们的生活方式。从智能手机助手到自动驾驶汽车,
AI技术已经渗透到各个领域。然而,这也带来了一些挑战,
比如隐私保护和就业问题需要我们认真思考。
"""
summary = extract_summary(article)
print(summary)
title='人工智能的发展与挑战' summary='人工智能正在深刻地改变我们的生活方式,其技术已广泛应用于各个领域。然而,这种快速发展也伴随着一些挑战,特别是隐私保护和就业问题需要我们深入思考和解决。' key_points=['人工智能改变生活方式', 'AI技术广泛应用', '面临的挑战如隐私保护和就业问题'] word_count=30
class SentimentAnalysis(BaseModel):text: strsentiment: str = Field(description="positive, negative, or neutral")confidence: float = Field(ge=0, le=1)keywords: list[str]@instructor_cache
async def analyze_sentiment(text: str) -> SentimentAnalysis:return await aclient.chat.completions.create(model="qwen-turbo",response_model=SentimentAnalysis,messages=[{"role": "user", "content": f"Analyze the sentiment: {text}"}])# 情感分析
async def analyze():result = await analyze_sentiment("这个产品非常好用,超出我的预期!")print(result)await analyze()
text='这个产品非常好用,超出我的预期!' sentiment='positive' confidence=0.99 keywords=['产品', '好用', '超出预期']
class ProductInfo(BaseModel):name: strprice: floatcategory: strfeatures: list[str]rating: float = Field(ge=0, le=5)@instructor_cache
def extract_product(description: str) -> ProductInfo:return client.chat.completions.create(model="qwen-turbo",response_model=ProductInfo,messages=[{"role": "user", "content": f"Extract product information: {description}"}])product_desc = "最新款iPhone 15 Pro,支持5G,售价999美元,深空黑色,256GB存储"
product = extract_product(product_desc)
print(product)
name='iPhone 15 Pro' price=999.0 category='Electronics' features=['Latest model', '5G support', 'Deep Space Black color', '256GB storage'] rating=4.5
class Address(BaseModel):country: strprovince: strcity: strstreet: strpostal_code: str = Field(default="")@instructor_cache
async def parse_address(address_text: str) -> Address:return await aclient.chat.completions.create(model="qwen-turbo",response_model=Address,messages=[{"role": "user", "content": f"Parse this address: {address_text}"}])await parse_address("北京市海淀区中关村大街1号")
Address(country='中国', province='北京市', city='北京市', street='中关村大街1号', postal_code='')
参考链接:https://github.com/instructor-ai/instructor/tree/main