您的位置:首页 > 教育 > 培训 > Openai api使用

Openai api使用

2024/10/5 17:28:05 来源:https://blog.csdn.net/weixin_44986037/article/details/141226434  浏览:    关键词:Openai api使用

Openai api使用

  • 1、文档
  • 2、单个请求
  • 3、batch批量请求
  • 4、aiohttp和 asyncio实现异步批量

1、文档

openai:
官方文档:https://platform.openai.com/docs/quickstart
官方python文档:https://github.com/openai/openai-python
batch批量请求:

  • https://platform.openai.com/docs/guides/batch/overview
  • https://github.com/SpellcraftAI/oaib

2、单个请求

  1. client.chat.completions
    import os# 设置环境变量
    os.environ['OPENAI_API_KEY'] = 'you_key' #用你的openai key
    # 验证环境变量是否设置成功
    print(os.getenv('OPENAI_API_KEY'))from openai import OpenAI
    client = OpenAI()
    stream=True
    completion = client.chat.completions.create(model="gpt-4",max_tokens=200,temperature=0.5,messages=[# {"role": "system", "content": "You are a helpful assistant."},# {#     "role": "user",#     "content": "Write a haiku about recursion in programming."# },{"role": "system", "content": "你是一位作家."},{"role": "user","content": "写一篇小说,关于猫的."}],stream=stream
    )if stream:output_text = ''for chunk in completion:# print(chunk.choices[0].delta.content,end='')print(chunk.choices[0].delta.content or "", end="")content= chunk.choices[0].delta.contentif content:  # 检查 content 是否为 Noneoutput_text +=contentprint('\n','*'*100)print('接收到内容:\n',output_text)else:print(completion)print(completion.choices[0].message)
    

2.requests url的方式,参考官方的curl,可先在postman上试

import requests
import jsonurl = "https://api.openai.com/v1/chat/completions"payload = json.dumps({"model": "gpt-4","max_tokens":200,"temperature":0.5,"messages": [{"role": "system","content": "You are a helpful assistant."},{"role": "user","content": "讲个故事,200字."}],"stream": False,# "response_format":"json"
})
headers = {'Authorization': 'Bearer sk-you_key', ##用你的openai key,前面加上Bearer 'Content-Type': 'application/json',# 'Cookie': '__' #postman生成,或者不要
}response = requests.request("POST", url, headers=headers, data=payload,stream=True)print('类型:',type(response))
print(response.text)#流
# # 遍历响应内容,要转换byte
# for line in response.iter_lines():
#     if line:
#         # 对每一行进行解码并处理
#         line_decoded = line.decode('utf-8').strip()
#         if line_decoded.startswith('data:'):
#             # 去掉 "data: " 前缀并解析为 JSON
#             data = line_decoded[5:].strip()
#             try:
#                 json_data = json.loads(data)
#                 print(json_data)
#                 # print(json_data['choices'][0]['delta']['content']or "", end="")
#                 print(json_data['choices'][0]['delta']['content'] or "", end="")
#             except ValueError as e:
#                 print(f"Invalid JSON: {data}")

3、batch批量请求

  1. 使用oaib库,https://github.com/SpellcraftAI/oaib

    from oaib import Auto
    import asyncio
    import os
    # 设置环境变量
    os.environ['OPENAI_API_KEY'] = 'sk-you_key'
    # 验证环境变量是否设置成功
    print(os.getenv('OPENAI_API_KEY'))async def get_batch():# Automatically set rate limits.batch = Auto(workers=8)# Fetch 1,000 chat completions as quickly as possible, setting rate limits# automatically from OpenAI's response headers.for i in range(5):await batch.add("chat.completions.create", model="gpt-4",max_tokens=200,temperature=0.5,messages=[{"role": "user", "content": "讲个故事"}])resp= await batch.run()print('类型:',type(resp))print('返回:',resp)for i in resp['result'].tolist():print(type(i),i)# return batchif __name__ == "__main__":# batch=get_batch()# await batch.run()# get_batch()asyncio.run(get_batch())
    
  2. aiohttp和 asyncio实现异步批量

import aiohttp
import asyncio# 要请求的URL
url = "https://api.openai.com/v1/chat/completions"headers = {'Authorization': 'Bearer sk-you_key','Content-Type': 'application/json',# 'Cookie': '__cf_bm=fGp5'
}# 要发送的数据
# data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]data=['讲个故事','讲个笑话','分析股市']
# data_list=[ item['messages'][0]['content']==i for i in data]
data_list=[ {"model": "gpt-4","max_tokens":200,"temperature":0.5,"messages": [{"role": "system","content": "You are a helpful assistant."},{"role": "user","content": i}],"stream": False,# "response_format":"json"
} for i in data]print(data_list)
# quit('测试')async def fetch_data(session, data):try:async with session.post(url, json=data, headers=headers) as response:response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPErrorreturn await response.json()except aiohttp.ClientError as e:print(f"请求失败: {e}")return Noneasync def main():con = aiohttp.TCPConnector(ssl=False) #ssl问题async with aiohttp.ClientSession(connector=con, trust_env=True) as session:tasks = [fetch_data(session, data) for data in data_list]results = await asyncio.gather(*tasks)for result in results:if result:print(result)# 运行主程序
if __name__ == '__main__':asyncio.run(main())

4、aiohttp和 asyncio实现异步批量

1、client

import aiohttp
import asyncio# 要请求的URL
url = 'http://172.31.208.3:8057/testp'# 要发送的数据
data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]# {"name":"urm1","data":"post请求数据1"}# 自定义请求头
# headers = {
#     'User-Agent': 'my-app',
#     'Authorization': 'Bearer YOUR_TOKEN'
# }
headers = {'Content-Type': 'application/json'
}async def fetch_data(session, data):try:async with session.post(url, json=data, headers=headers) as response:response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPErrorreturn await response.json()except aiohttp.ClientError as e:print(f"请求失败: {e}")return Noneasync def main():async with aiohttp.ClientSession() as session:tasks = [fetch_data(session, data) for data in data_list]results = await asyncio.gather(*tasks)for result in results:if result:print(result)# 运行主程序
if __name__ == '__main__':asyncio.run(main())

2、server


from typing import Unionfrom fastapi import FastAPI
import uvicorn
from pydantic import BaseModel,Field
from typing import Union,Optional,Literalapp = FastAPI()# 1、get请求@app.get("/")
def read_root():return {"Hello": "World"}@app.get("/testg")
def read_root(name: str, content: str):print('get测试:',name,'和',content)return {"name":name, "include":content}class Item(BaseModel):data: strname: str@app.post("/testp")
async def update_item(item: Item):print('post测试:',item.name,'和',item.data)results = {"name":item.name, "data":item.data,"item":item}return resultsif __name__ == "__main__":#方法一:# config = uvicorn.Config("main:app", host='0.0.0.0',port=8888, reload=True, log_level="info") #指定模块,当前用FastAPI()的appconfig = uvicorn.Config('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")# config = uvicorn.Config(app,host='0.0.0.0',port=8888, reload=True, log_level="info") #用app,只有在不使用多处理(worker=NUM)或重新加载(reload=True)的情况下,此样式才有效,因此我们建议使用导入字符串样式server = uvicorn.Server(config)server.run()#方法二:# from pathlib import Path# uvicorn.run('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com