在现代的 Web 开发中,实时数据传输是一个常见的需求。本文将介绍如何使用 FastAPI 实现一个支持 Streamable HTTP 的 MCP(Model Context Protocol)服务器,并提供 Python 客户端和前端客户端的实现。
1. 什么是 Streamable HTTP 和 MCP?
Streamable HTTP 是一种允许服务器以流的形式向客户端发送数据的技术。这在处理长时间运行的操作或实时数据更新时非常有用。
MCP(Model Context Protocol) 是一种协议,用于在客户端和服务器之间传输模型上下文信息。它支持初始化、消息传输和进度跟踪等功能。
2. 服务器端实现
我们将使用 FastAPI 来实现一个支持 Streamable HTTP 的 MCP 服务器。FastAPI 是一个现代、快速的 Web 框架,基于 Python 类型提示,支持异步操作。
安装 FastAPI 和 Uvicorn
首先,确保你已经安装了 FastAPI 和 Uvicorn。运行以下命令进行安装:
pip install fastapi uvicorn
服务器端代码
以下是服务器端的完整代码:
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import uuidapp = FastAPI()# 配置 CORS
app.add_middleware(CORSMiddleware,allow_origins=["*"], # 允许所有来源allow_credentials=True,allow_methods=["*"], # 允许所有 HTTP 方法allow_headers=["*"], # 允许所有头部expose_headers=["Mcp-Session-Id"], # 允许客户端访问的自定义头
)# 模拟存储会话信息
sessions = {}@app.post("/message")
async def handle_message(request: Request):session_id = request.headers.get("Mcp-Session-Id")if not session_id:session_id = str(uuid.uuid4())# 获取请求数据data = await request.json()print(f"Received message: {data}")# 模拟处理请求并发送响应response_data = {"result": "Request processed"}# 返回 JSON 响应,并在头中包含 Mcp-Session-Idreturn JSONResponse(content={"jsonrpc": "2.0", "id": data.get("id"), "result": response_data},headers={"Mcp-Session-Id": session_id})@app.get("/message")
async def handle_sse(request: Request):session_id = request.headers.get("Mcp-Session-Id") or request.query_params.get("Mcp-Session-Id")if not session_id:raise HTTPException(status_code=400, detail="Session ID is required")async def event_generator():for i in range(5):await asyncio.sleep(1) # 模拟延迟yield f"data: Message {i + 1}\n\n"return StreamingResponse(event_generator(), media_type="text/event-stream")if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
服务器端说明
-
CORS 配置:
- 使用
CORSMiddleware
允许跨域请求,确保前端页面可以访问服务器。 expose_headers
配置项允许客户端访问自定义头Mcp-Session-Id
。
- 使用
-
POST 请求处理:
- 如果客户端未提供
Mcp-Session-Id
,服务器会生成一个新的会话 ID 并返回。 - 服务器处理请求并返回响应,同时在响应头中包含
Mcp-Session-Id
。
- 如果客户端未提供
-
GET 请求处理:
- 服务器通过
StreamingResponse
返回流式数据。 - 每隔 1 秒发送一条消息,模拟实时数据。
- 服务器通过
3. Python 客户端实现
接下来,我们实现一个 Python 客户端,用于与服务器进行交互。
安装依赖
确保你已经安装了 requests
库。如果尚未安装,可以运行以下命令进行安装:
pip install requests
客户端代码
以下是 Python 客户端的完整代码:
import requestsserver_url = "http://127.0.0.1:8000/message"# 发送初始化请求
init_data = {"jsonrpc": "2.0","id": 1,"method": "initialize","params": {"protocolVersion": "2024-11-05","capabilities": {"roots": {"listChanged": True},"sampling": {}},"clientInfo": {"name": "ExampleClient","version": "1.0.0"}}
}
response = requests.post(server_url, json=init_data)
session_id = response.headers.get("Mcp-Session-Id")
print(f"Session ID: {session_id}")# 发送普通请求
request_data = {"jsonrpc": "2.0","id": 2,"method": "some_method","params": {"_meta": {"progressToken": "abc123"}}
}
response = requests.post(server_url, json=request_data, headers={"Mcp-Session-Id": session_id})
print(f"Response: {response.json()}")# 监听 SSE 流
print("Listening for SSE messages...")
print("Mcp-Session-Id", session_id)
with requests.get(server_url, headers={"Mcp-Session-Id": session_id}, stream=True) as response:for line in response.iter_lines():if line:decoded_line = line.decode("utf-8")print(f"SSE Message: {decoded_line}")
客户端说明
-
初始化请求:
- 发送初始化请求,获取
Mcp-Session-Id
。 - 将
Mcp-Session-Id
保存在变量中,用于后续请求。
- 发送初始化请求,获取
-
发送普通请求:
- 在请求头中包含
Mcp-Session-Id
,发送普通请求并获取响应。
- 在请求头中包含
-
监听 SSE 流:
- 使用
requests.get
的stream=True
参数,监听服务器发送的 SSE 流。 - 实时打印接收到的消息。
- 使用
4. 前端客户端实现
最后,我们实现一个前端页面,用于与服务器进行交互并实时展示数据。
前端代码
以下是前端页面的完整代码:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>MCP Client</title><style>body {font-family: Arial, sans-serif;margin: 20px;background-color: #f4f4f4;color: #333;}.container {max-width: 800px;margin: 0 auto;padding: 20px;background: #fff;border-radius: 8px;box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);}h1 {text-align: center;color: #2c3e50;}.buttons {text-align: center;margin-bottom: 20px;}button {padding: 10px 20px;margin: 0 10px;font-size: 16px;color: #fff;background-color: #3498db;border: none;border-radius: 4px;cursor: pointer;transition: background-color 0.3s;}button:hover {background-color: #2980b9;}.messages {margin-top: 20px;padding: 10px;background: #ecf0f1;border-radius: 4px;}.message {margin-bottom: 10px;padding: 10px;background: #bdc3c7;border-radius: 4px;color: #2c3e50;}</style>
</head>
<body><div class="container"><h1>MCP Client</h1><div class="buttons"><button id="initButton">Initialize</button><button id="sendMessageButton">Send Message</button></div><div class="messages" id="messages"></div></div><script>const serverUrl = "http://127.0.0.1:8000/message";let sessionId = null;let eventSource = null;// 初始化按钮点击事件document.getElementById('initButton').addEventListener('click', async () => {const initData = {"jsonrpc": "2.0","id": 1,"method": "initialize","params": {"protocolVersion": "2024-11-05","capabilities": {"roots": {"listChanged": true},"sampling": {}},"clientInfo": {"name": "ExampleClient","version": "1.0.0"}}};const response = await fetch(serverUrl, {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify(initData)});if (response.ok) {const data = await response.json();sessionId = response.headers.get('Mcp-Session-Id');displayMessage(`Session ID: ${sessionId}`);startSSE();} else {displayMessage(`Initialization failed: ${response.statusText}`);}});// 发送消息按钮点击事件document.getElementById('sendMessageButton').addEventListener('click', async () => {if (!sessionId) {displayMessage("Please initialize first.");return;}const messageData = {"jsonrpc": "2.0","id": 2,"method": "some_method","params": {"_meta": {"progressToken": "abc123"}}};const response = await fetch(serverUrl, {method: 'POST',headers: {'Content-Type': 'application/json','Mcp-Session-Id': sessionId},body: JSON.stringify(messageData)});if (response.ok) {const data = await response.json();displayMessage(`Response: ${JSON.stringify(data)}`);} else {displayMessage(`Message sending failed: ${response.statusText}`);}});// 启动 SSE 流function startSSE() {if (!sessionId) {displayMessage("Please initialize first.");return;}if (eventSource) {eventSource.close();}eventSource = new EventSource(`${serverUrl}?Mcp-Session-Id=${sessionId}`);eventSource.onmessage = (event) => {displayMessage(`SSE Message: ${event.data}`);};eventSource.onerror = (error) => {displayMessage(`SSE Error: ${error.message}`);eventSource.close();};}// 显示消息function displayMessage(message) {const messagesDiv = document.getElementById('messages');const messageElement = document.createElement('div');messageElement.className = 'message';messageElement.textContent = message;messagesDiv.appendChild(messageElement);}</script>
</body>
</html>
前端说明
-
初始化按钮点击事件:
- 发送初始化请求,获取
Mcp-Session-Id
。 - 将
Mcp-Session-Id
保存在变量中,用于后续请求。
- 发送初始化请求,获取
-
发送消息按钮点击事件:
- 在请求头中包含
Mcp-Session-Id
,发送普通请求并获取响应。
- 在请求头中包含
-
监听 SSE 流:
- 使用
EventSource
监听服务器发送的 SSE 流。 - 每当接收到新的消息时,调用
displayMessage
函数动态展示消息。
- 使用
-
显示消息:
- 动态创建一个新的
div
元素,并将其添加到页面的messages
区域。
- 动态创建一个新的
5. 运行步骤
启动服务器
将服务器端代码保存为 server.py
,然后运行以下命令启动服务器:
uvicorn server:app --reload
运行 Python 客户端
将 Python 客户端代码保存为 client.py
,然后运行以下命令启动客户端:
python client.py
运行效果:
(.venv) (base) ➜ pop git:(main) ✗ python client.py
Session ID: 587bb6ad-08f5-4102-8b27-4c276e9d7815
Response: {'jsonrpc': '2.0', 'id': 2, 'result': {'result': 'Request processed'}}
Listening for SSE messages...
Mcp-Session-Id 587bb6ad-08f5-4102-8b27-4c276e9d7815
SSE Message: data: Message 1
SSE Message: data: Message 2
SSE Message: data: Message 3
SSE Message: data: Message 4
SSE Message: data: Message 5
运行前端页面
将前端代码保存为 index.html
,然后在浏览器中打开该文件。
操作步骤
- 点击“Initialize”按钮,初始化会话并获取
Mcp-Session-Id
。 - 点击“Send Message”按钮,发送普通请求并查看服务器的响应。
- 页面会自动监听 SSE 流,并实时显示服务器发送的实时消息。
运行效果:
6. 调试
如果遇到问题,可以使用以下方法进行调试:
-
检查服务器日志:
- 查看服务器日志,确认是否生成了
Mcp-Session-Id
并返回给客户端。 - 确认
StreamingResponse
是否正确返回了流式数据。
- 查看服务器日志,确认是否生成了
-
检查浏览器开发者工具:
- 打开浏览器的开发者工具(F12),查看网络请求的响应头,确认是否包含
Mcp-Session-Id
。 - 查看
EventSource
的网络请求,确认是否正确接收到了流式数据。
- 打开浏览器的开发者工具(F12),查看网络请求的响应头,确认是否包含
-
检查跨域问题:
- 确保服务器正确配置了 CORS,允许前端页面的域名和端口。
- 确保
expose_headers
配置项正确,允许客户端访问自定义头Mcp-Session-Id
。
通过以上步骤,你可以实现一个支持 Streamable HTTP 的 MCP 服务器,并通过 Python 客户端和前端页面与服务器进行交互。希望这篇文章对你有所帮助