在现代的 Web 开发中,实时数据传输是一个常见的需求。本文将介绍如何使用 FastAPI 实现一个支持 Streamable HTTP 的 MCP(Model Context Protocol)服务器,并提供 Python 客户端和前端客户端的实现。
1. 什么是 Streamable HTTP 和 MCP?
Streamable HTTP 是一种允许服务器以流的形式向客户端发送数据的技术。这在处理长时间运行的操作或实时数据更新时非常有用。
MCP(Model Context Protocol) 是一种协议,用于在客户端和服务器之间传输模型上下文信息。它支持初始化、消息传输和进度跟踪等功能。
2. 服务器端实现
我们将使用 FastAPI 来实现一个支持 Streamable HTTP 的 MCP 服务器。FastAPI 是一个现代、快速的 Web 框架,基于 Python 类型提示,支持异步操作。
安装 FastAPI 和 Uvicorn
首先,确保你已经安装了 FastAPI 和 Uvicorn。运行以下命令进行安装:
pip install fastapi uvicorn
服务器端代码
以下是服务器端的完整代码:
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import uuid
app = FastAPI()
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源
allow_credentials=True,
allow_methods=["*"], # 允许所有 HTTP 方法
allow_headers=["*"], # 允许所有头部
expose_headers=["Mcp-Session-Id"], # 允许客户端访问的自定义头
)
# 模拟存储会话信息
sessions = {}
@app.post("/message")
async def handle_message(request: Request):
session_id = request.headers.get("Mcp-Session-Id")
if not session_id:
session_id = str(uuid.uuid4())
# 获取请求数据
data = await request.json()
print(f"Received message: {data}")
# 模拟处理请求并发送响应
response_data = {"result": "Request processed"}
# 返回 JSON 响应,并在头中包含 Mcp-Session-Id
return JSONResponse(content={"jsonrpc": "2.0", "id": data.get("id"), "result": response_data},
headers={"Mcp-Session-Id": session_id})
@app.get("/message")
async def handle_sse(request: Request):
session_id = request.headers.get("Mcp-Session-Id") or request.query_params.get("Mcp-Session-Id")
if not session_id:
raise HTTPException(status_code=400, detail="Session ID is required")
async def event_generator():
for i in range(5):
await asyncio.sleep(1) # 模拟延迟
yield f"data: Message {i + 1}\\n\\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
服务器端说明
CORS 配置:
- 使用 CORSMiddleware 允许跨域请求,确保前端页面可以访问服务器。
- expose_headers 配置项允许客户端访问自定义头 Mcp-Session-Id。
POST 请求处理:
- 如果客户端未提供 Mcp-Session-Id,服务器会生成一个新的会话 ID 并返回。
- 服务器处理请求并返回响应,同时在响应头中包含 Mcp-Session-Id。
GET 请求处理:
- 服务器通过 StreamingResponse 返回流式数据。
- 每隔 1 秒发送一条消息,模拟实时数据。
3. Python 客户端实现
接下来,我们实现一个 Python 客户端,用于与服务器进行交互。
安装依赖
确保你已经安装了 requests 库。如果尚未安装,可以运行以下命令进行安装:
pip install requests
客户端代码
以下是 Python 客户端的完整代码:
import requests
server_url = "http://127.0.0.1:8000/message"
# 发送初始化请求
init_data = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {
"listChanged": True
},
"sampling": {}
},
"clientInfo": {
"name": "ExampleClient",
"version": "1.0.0"
}
}
}
response = requests.post(server_url, json=init_data)
session_id = response.headers.get("Mcp-Session-Id")
print(f"Session ID: {session_id}")
# 发送普通请求
request_data = {
"jsonrpc": "2.0",
"id": 2,
"method": "some_method",
"params": {
"_meta": {
"progressToken": "abc123"
}
}
}
response = requests.post(server_url, json=request_data, headers={"Mcp-Session-Id": session_id})
print(f"Response: {response.json()}")
# 监听 SSE 流
print("Listening for SSE messages…")
print("Mcp-Session-Id", session_id)
with requests.get(server_url, headers={"Mcp-Session-Id": session_id}, stream=True) as response:
for line in response.iter_lines():
if line:
decoded_line = line.decode("utf-8")
print(f"SSE Message: {decoded_line}")
客户端说明
初始化请求:
- 发送初始化请求,获取 Mcp-Session-Id。
- 将 Mcp-Session-Id 保存在变量中,用于后续请求。
发送普通请求:
- 在请求头中包含 Mcp-Session-Id,发送普通请求并获取响应。
监听 SSE 流:
- 使用 requests.get 的 stream=True 参数,监听服务器发送的 SSE 流。
- 实时打印接收到的消息。
4. 前端客户端实现
最后,我们实现一个前端页面,用于与服务器进行交互并实时展示数据。
前端代码
以下是前端页面的完整代码:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MCP Client</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
background-color: #f4f4f4;
color: #333;
}
.container {
max-width: 800px;
margin: 0 auto;
padding: 20px;
background: #fff;
border-radius: 8px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}
h1 {
text-align: center;
color: #2c3e50;
}
.buttons {
text-align: center;
margin-bottom: 20px;
}
button {
padding: 10px 20px;
margin: 0 10px;
font-size: 16px;
color: #fff;
background-color: #3498db;
border: none;
border-radius: 4px;
cursor: pointer;
transition: background-color 0.3s;
}
button:hover {
background-color: #2980b9;
}
.messages {
margin-top: 20px;
padding: 10px;
background: #ecf0f1;
border-radius: 4px;
}
.message {
margin-bottom: 10px;
padding: 10px;
background: #bdc3c7;
border-radius: 4px;
color: #2c3e50;
}
</style>
</head>
<body>
<div class="container">
<h1>MCP Client</h1>
<div class="buttons">
<button id="initButton">Initialize</button>
<button id="sendMessageButton">Send Message</button>
</div>
<div class="messages" id="messages"></div>
</div>
<script>
const serverUrl = "http://127.0.0.1:8000/message";
let sessionId = null;
let eventSource = null;
// 初始化按钮点击事件
document.getElementById('initButton').addEventListener('click', async () => {
const initData = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {
"listChanged": true
},
"sampling": {}
},
"clientInfo": {
"name": "ExampleClient",
"version": "1.0.0"
}
}
};
const response = await fetch(serverUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(initData)
});
if (response.ok) {
const data = await response.json();
sessionId = response.headers.get('Mcp-Session-Id');
displayMessage(`Session ID: ${sessionId}`);
startSSE();
} else {
displayMessage(`Initialization failed: ${response.statusText}`);
}
});
// 发送消息按钮点击事件
document.getElementById('sendMessageButton').addEventListener('click', async () => {
if (!sessionId) {
displayMessage("Please initialize first.");
return;
}
const messageData = {
"jsonrpc": "2.0",
"id": 2,
"method": "some_method",
"params": {
"_meta": {
"progressToken": "abc123"
}
}
};
const response = await fetch(serverUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Mcp-Session-Id': sessionId
},
body: JSON.stringify(messageData)
});
if (response.ok) {
const data = await response.json();
displayMessage(`Response: ${JSON.stringify(data)}`);
} else {
displayMessage(`Message sending failed: ${response.statusText}`);
}
});
// 启动 SSE 流
function startSSE() {
if (!sessionId) {
displayMessage("Please initialize first.");
return;
}
if (eventSource) {
eventSource.close();
}
eventSource = new EventSource(`${serverUrl}?Mcp-Session-Id=${sessionId}`);
eventSource.onmessage = (event) => {
displayMessage(`SSE Message: ${event.data}`);
};
eventSource.onerror = (error) => {
displayMessage(`SSE Error: ${error.message}`);
eventSource.close();
};
}
// 显示消息
function displayMessage(message) {
const messagesDiv = document.getElementById('messages');
const messageElement = document.createElement('div');
messageElement.className = 'message';
messageElement.textContent = message;
messagesDiv.appendChild(messageElement);
}
</script>
</body>
</html>
前端说明
初始化按钮点击事件:
- 发送初始化请求,获取 Mcp-Session-Id。
- 将 Mcp-Session-Id 保存在变量中,用于后续请求。
发送消息按钮点击事件:
- 在请求头中包含 Mcp-Session-Id,发送普通请求并获取响应。
监听 SSE 流:
- 使用 EventSource 监听服务器发送的 SSE 流。
- 每当接收到新的消息时,调用 displayMessage 函数动态展示消息。
显示消息:
- 动态创建一个新的 div 元素,并将其添加到页面的 messages 区域。
5. 运行步骤
启动服务器
将服务器端代码保存为 server.py,然后运行以下命令启动服务器:
uvicorn server:app –reload
运行 Python 客户端
将 Python 客户端代码保存为 client.py,然后运行以下命令启动客户端:
python client.py
运行效果:
(.venv) (base) ➜ pop git:(main) ✗ python client.py
Session ID: 587bb6ad-08f5-4102-8b27-4c276e9d7815
Response: {'jsonrpc': '2.0', 'id': 2, 'result': {'result': 'Request processed'}}
Listening for SSE messages...
Mcp-Session-Id 587bb6ad-08f5-4102-8b27-4c276e9d7815
SSE Message: data: Message 1
SSE Message: data: Message 2
SSE Message: data: Message 3
SSE Message: data: Message 4
SSE Message: data: Message 5
运行前端页面
将前端代码保存为 index.html,然后在浏览器中打开该文件。
操作步骤
运行效果:
6. 调试
如果遇到问题,可以使用以下方法进行调试:
检查服务器日志:
- 查看服务器日志,确认是否生成了 Mcp-Session-Id 并返回给客户端。
- 确认 StreamingResponse 是否正确返回了流式数据。
检查浏览器开发者工具:
- 打开浏览器的开发者工具(F12),查看网络请求的响应头,确认是否包含 Mcp-Session-Id。
- 查看 EventSource 的网络请求,确认是否正确接收到了流式数据。
检查跨域问题:
- 确保服务器正确配置了 CORS,允许前端页面的域名和端口。
- 确保 expose_headers 配置项正确,允许客户端访问自定义头 Mcp-Session-Id。
通过以上步骤,你可以实现一个支持 Streamable HTTP 的 MCP 服务器,并通过 Python 客户端和前端页面与服务器进行交互。希望这篇文章对你有所帮助
评论前必须登录!
注册