Openai api使用
- 1、文档
- 2、单个请求
- 3、batch批量请求
- 4、aiohttp和 asyncio实现异步批量
1、文档
openai:
官方文档:https://platform.openai.com/docs/quickstart
官方python文档:https://github.com/openai/openai-python
batch批量请求:
- https://platform.openai.com/docs/guides/batch/overview
- https://github.com/SpellcraftAI/oaib
2、单个请求
- client.chat.completions
import os# 设置环境变量 os.environ['OPENAI_API_KEY'] = 'you_key' #用你的openai key # 验证环境变量是否设置成功 print(os.getenv('OPENAI_API_KEY'))from openai import OpenAI client = OpenAI() stream=True completion = client.chat.completions.create(model="gpt-4",max_tokens=200,temperature=0.5,messages=[# {"role": "system", "content": "You are a helpful assistant."},# {# "role": "user",# "content": "Write a haiku about recursion in programming."# },{"role": "system", "content": "你是一位作家."},{"role": "user","content": "写一篇小说,关于猫的."}],stream=stream )if stream:output_text = ''for chunk in completion:# print(chunk.choices[0].delta.content,end='')print(chunk.choices[0].delta.content or "", end="")content= chunk.choices[0].delta.contentif content: # 检查 content 是否为 Noneoutput_text +=contentprint('\n','*'*100)print('接收到内容:\n',output_text)else:print(completion)print(completion.choices[0].message)
2.requests url的方式,参考官方的curl,可先在postman上试
import requests
import jsonurl = "https://api.openai.com/v1/chat/completions"payload = json.dumps({"model": "gpt-4","max_tokens":200,"temperature":0.5,"messages": [{"role": "system","content": "You are a helpful assistant."},{"role": "user","content": "讲个故事,200字."}],"stream": False,# "response_format":"json"
})
headers = {'Authorization': 'Bearer sk-you_key', ##用你的openai key,前面加上Bearer 'Content-Type': 'application/json',# 'Cookie': '__' #postman生成,或者不要
}response = requests.request("POST", url, headers=headers, data=payload,stream=True)print('类型:',type(response))
print(response.text)#流
# # 遍历响应内容,要转换byte
# for line in response.iter_lines():
# if line:
# # 对每一行进行解码并处理
# line_decoded = line.decode('utf-8').strip()
# if line_decoded.startswith('data:'):
# # 去掉 "data: " 前缀并解析为 JSON
# data = line_decoded[5:].strip()
# try:
# json_data = json.loads(data)
# print(json_data)
# # print(json_data['choices'][0]['delta']['content']or "", end="")
# print(json_data['choices'][0]['delta']['content'] or "", end="")
# except ValueError as e:
# print(f"Invalid JSON: {data}")
3、batch批量请求
-
使用oaib库,https://github.com/SpellcraftAI/oaib
from oaib import Auto import asyncio import os # 设置环境变量 os.environ['OPENAI_API_KEY'] = 'sk-you_key' # 验证环境变量是否设置成功 print(os.getenv('OPENAI_API_KEY'))async def get_batch():# Automatically set rate limits.batch = Auto(workers=8)# Fetch 1,000 chat completions as quickly as possible, setting rate limits# automatically from OpenAI's response headers.for i in range(5):await batch.add("chat.completions.create", model="gpt-4",max_tokens=200,temperature=0.5,messages=[{"role": "user", "content": "讲个故事"}])resp= await batch.run()print('类型:',type(resp))print('返回:',resp)for i in resp['result'].tolist():print(type(i),i)# return batchif __name__ == "__main__":# batch=get_batch()# await batch.run()# get_batch()asyncio.run(get_batch())
-
aiohttp和 asyncio实现异步批量
import aiohttp
import asyncio# 要请求的URL
url = "https://api.openai.com/v1/chat/completions"headers = {'Authorization': 'Bearer sk-you_key','Content-Type': 'application/json',# 'Cookie': '__cf_bm=fGp5'
}# 要发送的数据
# data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]data=['讲个故事','讲个笑话','分析股市']
# data_list=[ item['messages'][0]['content']==i for i in data]
data_list=[ {"model": "gpt-4","max_tokens":200,"temperature":0.5,"messages": [{"role": "system","content": "You are a helpful assistant."},{"role": "user","content": i}],"stream": False,# "response_format":"json"
} for i in data]print(data_list)
# quit('测试')async def fetch_data(session, data):try:async with session.post(url, json=data, headers=headers) as response:response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPErrorreturn await response.json()except aiohttp.ClientError as e:print(f"请求失败: {e}")return Noneasync def main():con = aiohttp.TCPConnector(ssl=False) #ssl问题async with aiohttp.ClientSession(connector=con, trust_env=True) as session:tasks = [fetch_data(session, data) for data in data_list]results = await asyncio.gather(*tasks)for result in results:if result:print(result)# 运行主程序
if __name__ == '__main__':asyncio.run(main())
4、aiohttp和 asyncio实现异步批量
1、client
import aiohttp
import asyncio# 要请求的URL
url = 'http://172.31.208.3:8057/testp'# 要发送的数据
data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]# {"name":"urm1","data":"post请求数据1"}# 自定义请求头
# headers = {
# 'User-Agent': 'my-app',
# 'Authorization': 'Bearer YOUR_TOKEN'
# }
headers = {'Content-Type': 'application/json'
}async def fetch_data(session, data):try:async with session.post(url, json=data, headers=headers) as response:response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPErrorreturn await response.json()except aiohttp.ClientError as e:print(f"请求失败: {e}")return Noneasync def main():async with aiohttp.ClientSession() as session:tasks = [fetch_data(session, data) for data in data_list]results = await asyncio.gather(*tasks)for result in results:if result:print(result)# 运行主程序
if __name__ == '__main__':asyncio.run(main())
2、server
from typing import Unionfrom fastapi import FastAPI
import uvicorn
from pydantic import BaseModel,Field
from typing import Union,Optional,Literalapp = FastAPI()# 1、get请求@app.get("/")
def read_root():return {"Hello": "World"}@app.get("/testg")
def read_root(name: str, content: str):print('get测试:',name,'和',content)return {"name":name, "include":content}class Item(BaseModel):data: strname: str@app.post("/testp")
async def update_item(item: Item):print('post测试:',item.name,'和',item.data)results = {"name":item.name, "data":item.data,"item":item}return resultsif __name__ == "__main__":#方法一:# config = uvicorn.Config("main:app", host='0.0.0.0',port=8888, reload=True, log_level="info") #指定模块,当前用FastAPI()的appconfig = uvicorn.Config('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")# config = uvicorn.Config(app,host='0.0.0.0',port=8888, reload=True, log_level="info") #用app,只有在不使用多处理(worker=NUM)或重新加载(reload=True)的情况下,此样式才有效,因此我们建议使用导入字符串样式server = uvicorn.Server(config)server.run()#方法二:# from pathlib import Path# uvicorn.run('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")