说明
对之前的东西稍优化了一下,
开了一个load文档的接口。
遇到提问将会按下面步骤执行
1.首先查询向量库,使用工具 search_knowledge_base,有相关内容直接返回
2.如果没有查到相关内容,使用工具 get_time,获取最新时间。
3.通过最新时间再搜索Tavily获取最终结果,用于大模型分析给出答案
代码如下
import os
import time
import uvicorn
from typing import Optional
from pydantic import BaseModel
from langchain_milvus import Milvus
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from starlette.middleware.cors import CORSMiddleware
from langchain_core.document_loaders import BaseLoader
from fastapi import FastAPI, HTTPException, UploadFile
from langchain.chains.retrieval_qa.base import RetrievalQA
from langchain_community.embeddings import ZhipuAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain.agents import AgentExecutor, create_structured_chat_agentos.environ['LANGSMITH_API_KEY'] = "lsv2_xx"
os.environ["TAVILY_API_KEY"] = "tvly-dexx"
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['DEEPSEEK_API_KEY'] = "sk-exxf"# 大模型
llm = ChatOpenAI(model="deepseek-chat", base_url="https://api.deepseek.com",api_key="sk-e2xx")# 设置embeddings模型
os.environ["ZHIPUAI_API_KEY"] = "a225xx"
embeddings_model = ZhipuAIEmbeddings(model="embedding-3",dimensions=1024
)# Milvus配置,连接使用Milvus
milvus_host = "127.0.0.1" # Milvus服务器的主机名或IP地址
milvus_port = "19530" # Milvus服务器的端口
vector_store = Milvus(embedding_function=embeddings_model,connection_args={"host": milvus_host, "port": milvus_port},auto_id=True, # 设置集合中的数据为自动增长ID,默认是Falsecollection_name="ai_use",
)# 获取检索器,选择 top-2 相关的检索结果
retriever = vector_store.as_retriever(search_type="similarity",search_kwargs={"k": 2,"score_threshold": 0.3, # 降低相似度阈值,增加召回率}
)# 创建自定义的 QA 提示模板
qa_prompt_template = """基于以下上下文信息回答问题。如果上下文中包含相关信息,请直接使用;如果没有相关信息,请明确说明。上下文信息:
{context}问题: {question}请提供详细的回答。"""qa_prompt = PromptTemplate(template=qa_prompt_template,input_variables=["context", "question"]
)qa_chain = RetrievalQA.from_chain_type(llm=llm,chain_type="stuff",retriever=retriever,return_source_documents=True,chain_type_kwargs={"prompt": qa_prompt,"verbose": True}
)# 搜索知识库的工具
@tool("search_knowledge_base")
def search_knowledge_base(query: str) -> str:"""从知识库中搜索相关信息来回答问题。此工具必须作为首选工具使用。"""try:result = qa_chain.invoke({"query": query})result_text = result.get("result", "").lower()print(f"\n调试 - 搜索查询: {query}")print(f"调试 - 原始结果: {result_text}")# 1. 检查结果是否为空或太短if not result.get("result") or len(result["result"].strip()) < 10:return "[未找到相关信息] 知识库中没有找到相关内容。"# 2. 检查是否包含表示未找到信息的关键词negative_patterns = ["未提及","未提到","没有提到","没有找到","没有相关","未找到","无法找到","无法得出","没有关于"]# 如果包含任何表示未找到的词语,返回未找到if any(pattern in result_text for pattern in negative_patterns):return "[未找到相关信息] 知识库中没有找到相关内容。"return f"[找到相关信息] {result['result']}"except Exception as e:print(f"搜索错误: {str(e)}")return f"[搜索错误] {str(e)}"# 获取最新时间的工具
@tool("get_time_func")
def get_time():"""返回当前时间,需要获取实时时间时必须使用这个工具。禁止使用大模型内部的时间"""name: str = "tavily_search_results_json"description: str = "返回当前时间,需要获取实时时间时必须使用这个工具。禁止使用大模型内部的时间"current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())return f"当前时间是:{current_time}"tools = [TavilySearchResults(max_results=1), get_time, search_knowledge_base]## 提示词
system = '''你是一个智能助手。在回答问题时必须严格遵循以下规则:1. 首先检查已有的 Initial Observation 中是否包含知识库搜索结果:- 如果包含 "[找到相关信息]",并且内容相符。直接使用这些信息作为答案,不再通过网络搜索工具获取信息- 如果包含 "[未找到相关信息]",或者找到相关信息,但是主语内容不符合,则使用网络搜索工具获取信息- 如果包含 "[搜索错误]",报告错误并停止有以下工具可以使用:{tools}2. 工具使用规则:- 知识库搜索 (search_knowledge_base):优先使用- 时间查询 (get_time_func):需要获取最新信息,或者需要当前时间时使用,使用该工具获取时间,禁止使用模型内部时间- 网络搜索 (tavily_search_results):仅当知识库无信息时使用
如果问题中包含,'最新','现在'等词,那么必须先使用 get_time_func 工具来查询时间,注意:禁止使用模型内部时间
"Action"的有效取值为: "Final Answer" or {tool_names}每个$JSON_BLOB只提供一个action,如下所示:{{"action": $TOOL_NAME,"action_input": $INPUT}}```遵循此格式:Question: 用户输入的问题Thought: 回答这个问题我需要做些什么,尽可能考虑前面和后面的步骤Action: 回答问题所选取的工具```$JSON_BLOB```Observation: 工具返回的结果... (这个思考/行动/行动输入/观察可以重复N次)Thought: 我现在知道最终答案Action: 工具返回的结果信息```{{"action": "Final Answer","action_input": "原始输入问题的最终答案"}}```开始!提醒始终使用单个操作的有效json blob进行响应。必要时使用工具. 如果合适,直接回应。格式是Action:“$JSON_BLOB”然后是Observation
'''
human = '''{input}{agent_scratchpad}(提醒:无论如何都要在JSON blob中响应!)'''# 修改 agent 的创建
prompt = ChatPromptTemplate.from_messages([("system", system),("user", human),
])agent = create_structured_chat_agent(llm=llm,prompt=prompt,tools=tools
)agent_executor = AgentExecutor(agent=agent,tools=tools,verbose=True,max_iterations=3,handle_parsing_errors=True
)# 构建 FastAPI 应用,提供服务
app = FastAPI()# 可选,前端报CORS时
app.add_middleware(CORSMiddleware,allow_origins=['*'],allow_credentials=True,allow_methods=['*'],allow_headers=['*'],
)# 定义请求模型
class QuestionRequest(BaseModel):question: str# 定义响应模型
class AnswerResponse(BaseModel):answer: str@app.post("/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest):try:user_question = request.questionprint(f"收到问题: {user_question}")# 1. 先执行知识库搜索kb_result = search_knowledge_base.invoke(user_question)print(f"\n知识库搜索结果: {kb_result}")# 2. 构建初始观察initial_scratchpad = f"Initial Observation: {kb_result}"# 3. 使用 agent 处理prompt_inputs = {"input": f"""请回答以下问题,记住必须先检查 Initial Observation 的内容:问题: {user_question}{initial_scratchpad}""","agent_scratchpad": [], # 清空 agent_scratchpad,避免重复# "tools_name": ", ".join([tool.name for tool in tools]) # 添加 tools_name}print("\n发送给 Agent 的输入:")print(prompt_inputs)answer = agent_executor.invoke(prompt_inputs)print("\n最终答案:")print(answer["output"])# 4. 添加调试信息answer["debug_info"] = {"knowledge_base_result": kb_result,"initial_observation": initial_scratchpad,"question": user_question}return AnswerResponse(answer=answer['output'])except Exception as e:print(f"错误: {str(e)}")raise HTTPException(status_code=500, detail=str(e))class MyTextLoader(BaseLoader):def __init__(self,contents,encoding: Optional[str] = None,autodetect_encoding: bool = False,):"""Initialize with file path."""self.contents = contentsself.encoding = encodingself.autodetect_encoding = autodetect_encodingdef lazy_load(self):"""Load from file path."""metadata = {"source": str('upload_file')}yield Document(page_content=self.contents, metadata=metadata)@app.post("/load_document", status_code=201)
async def load_cocument(file: UploadFile):contents = await file.read()loader = MyTextLoader(contents, encoding='utf-8')documents = loader.load()text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=0)texts = text_splitter.split_documents(documents)vector_store.add_documents(texts)return {"message": "文件已成功上传"}if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)
测试搜索到结果的数据
测试搜索不到结果的数据
测试加载文档并搜索
- 加载文档
- 搜索文档相关内容
测试结果看起来没什么问题。可以用一用了。
后续可以把结果保存到redis,然后对会话也进行使用RunnableWithMessageHistory,redis保存等操作。最重要的是提供一个ui交互界面。。。。慢慢来,任重道远、