云计算百科
云计算领域专业知识百科平台

实现一个支持 Streamable HTTP 的 MCP 服务器及客户端

在现代的 Web 开发中,实时数据传输是一个常见的需求。本文将介绍如何使用 FastAPI 实现一个支持 Streamable HTTP 的 MCP(Model Context Protocol)服务器,并提供 Python 客户端和前端客户端的实现。

1. 什么是 Streamable HTTP 和 MCP?

Streamable HTTP 是一种允许服务器以流的形式向客户端发送数据的技术。这在处理长时间运行的操作或实时数据更新时非常有用。

MCP(Model Context Protocol) 是一种协议,用于在客户端和服务器之间传输模型上下文信息。它支持初始化、消息传输和进度跟踪等功能。

2. 服务器端实现

我们将使用 FastAPI 来实现一个支持 Streamable HTTP 的 MCP 服务器。FastAPI 是一个现代、快速的 Web 框架,基于 Python 类型提示,支持异步操作。

安装 FastAPI 和 Uvicorn

首先,确保你已经安装了 FastAPI 和 Uvicorn。运行以下命令进行安装:

pip install fastapi uvicorn

服务器端代码

以下是服务器端的完整代码:

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import uuid

app = FastAPI()

# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源
allow_credentials=True,
allow_methods=["*"], # 允许所有 HTTP 方法
allow_headers=["*"], # 允许所有头部
expose_headers=["Mcp-Session-Id"], # 允许客户端访问的自定义头
)

# 模拟存储会话信息
sessions = {}

@app.post("/message")
async def handle_message(request: Request):
session_id = request.headers.get("Mcp-Session-Id")
if not session_id:
session_id = str(uuid.uuid4())

# 获取请求数据
data = await request.json()
print(f"Received message: {data}")

# 模拟处理请求并发送响应
response_data = {"result": "Request processed"}

# 返回 JSON 响应,并在头中包含 Mcp-Session-Id
return JSONResponse(content={"jsonrpc": "2.0", "id": data.get("id"), "result": response_data},
headers={"Mcp-Session-Id": session_id})

@app.get("/message")
async def handle_sse(request: Request):
session_id = request.headers.get("Mcp-Session-Id") or request.query_params.get("Mcp-Session-Id")
if not session_id:
raise HTTPException(status_code=400, detail="Session ID is required")

async def event_generator():
for i in range(5):
await asyncio.sleep(1) # 模拟延迟
yield f"data: Message {i + 1}\\n\\n"

return StreamingResponse(event_generator(), media_type="text/event-stream")

if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)

服务器端说明

  • CORS 配置:

    • 使用 CORSMiddleware 允许跨域请求,确保前端页面可以访问服务器。
    • expose_headers 配置项允许客户端访问自定义头 Mcp-Session-Id。
  • POST 请求处理:

    • 如果客户端未提供 Mcp-Session-Id,服务器会生成一个新的会话 ID 并返回。
    • 服务器处理请求并返回响应,同时在响应头中包含 Mcp-Session-Id。
  • GET 请求处理:

    • 服务器通过 StreamingResponse 返回流式数据。
    • 每隔 1 秒发送一条消息,模拟实时数据。
  • 3. Python 客户端实现

    接下来,我们实现一个 Python 客户端,用于与服务器进行交互。

    安装依赖

    确保你已经安装了 requests 库。如果尚未安装,可以运行以下命令进行安装:

    pip install requests

    客户端代码

    以下是 Python 客户端的完整代码:

    import requests

    server_url = "http://127.0.0.1:8000/message"

    # 发送初始化请求
    init_data = {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "initialize",
    "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
    "roots": {
    "listChanged": True
    },
    "sampling": {}
    },
    "clientInfo": {
    "name": "ExampleClient",
    "version": "1.0.0"
    }
    }
    }
    response = requests.post(server_url, json=init_data)
    session_id = response.headers.get("Mcp-Session-Id")
    print(f"Session ID: {session_id}")

    # 发送普通请求
    request_data = {
    "jsonrpc": "2.0",
    "id": 2,
    "method": "some_method",
    "params": {
    "_meta": {
    "progressToken": "abc123"
    }
    }
    }
    response = requests.post(server_url, json=request_data, headers={"Mcp-Session-Id": session_id})
    print(f"Response: {response.json()}")

    # 监听 SSE 流
    print("Listening for SSE messages…")
    print("Mcp-Session-Id", session_id)
    with requests.get(server_url, headers={"Mcp-Session-Id": session_id}, stream=True) as response:
    for line in response.iter_lines():
    if line:
    decoded_line = line.decode("utf-8")
    print(f"SSE Message: {decoded_line}")

    客户端说明

  • 初始化请求:

    • 发送初始化请求,获取 Mcp-Session-Id。
    • 将 Mcp-Session-Id 保存在变量中,用于后续请求。
  • 发送普通请求:

    • 在请求头中包含 Mcp-Session-Id,发送普通请求并获取响应。
  • 监听 SSE 流:

    • 使用 requests.get 的 stream=True 参数,监听服务器发送的 SSE 流。
    • 实时打印接收到的消息。
  • 4. 前端客户端实现

    最后,我们实现一个前端页面,用于与服务器进行交互并实时展示数据。

    前端代码

    以下是前端页面的完整代码:

    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MCP Client</title>
    <style>
    body {
    font-family: Arial, sans-serif;
    margin: 20px;
    background-color: #f4f4f4;
    color: #333;
    }
    .container {
    max-width: 800px;
    margin: 0 auto;
    padding: 20px;
    background: #fff;
    border-radius: 8px;
    box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
    }
    h1 {
    text-align: center;
    color: #2c3e50;
    }
    .buttons {
    text-align: center;
    margin-bottom: 20px;
    }
    button {
    padding: 10px 20px;
    margin: 0 10px;
    font-size: 16px;
    color: #fff;
    background-color: #3498db;
    border: none;
    border-radius: 4px;
    cursor: pointer;
    transition: background-color 0.3s;
    }
    button:hover {
    background-color: #2980b9;
    }
    .messages {
    margin-top: 20px;
    padding: 10px;
    background: #ecf0f1;
    border-radius: 4px;
    }
    .message {
    margin-bottom: 10px;
    padding: 10px;
    background: #bdc3c7;
    border-radius: 4px;
    color: #2c3e50;
    }
    </style>
    </head>
    <body>
    <div class="container">
    <h1>MCP Client</h1>
    <div class="buttons">
    <button id="initButton">Initialize</button>
    <button id="sendMessageButton">Send Message</button>
    </div>
    <div class="messages" id="messages"></div>
    </div>

    <script>
    const serverUrl = "http://127.0.0.1:8000/message";
    let sessionId = null;
    let eventSource = null;

    // 初始化按钮点击事件
    document.getElementById('initButton').addEventListener('click', async () => {
    const initData = {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "initialize",
    "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
    "roots": {
    "listChanged": true
    },
    "sampling": {}
    },
    "clientInfo": {
    "name": "ExampleClient",
    "version": "1.0.0"
    }
    }
    };

    const response = await fetch(serverUrl, {
    method: 'POST',
    headers: {
    'Content-Type': 'application/json'
    },
    body: JSON.stringify(initData)
    });

    if (response.ok) {
    const data = await response.json();
    sessionId = response.headers.get('Mcp-Session-Id');
    displayMessage(`Session ID: ${sessionId}`);
    startSSE();
    } else {
    displayMessage(`Initialization failed: ${response.statusText}`);
    }
    });

    // 发送消息按钮点击事件
    document.getElementById('sendMessageButton').addEventListener('click', async () => {
    if (!sessionId) {
    displayMessage("Please initialize first.");
    return;
    }

    const messageData = {
    "jsonrpc": "2.0",
    "id": 2,
    "method": "some_method",
    "params": {
    "_meta": {
    "progressToken": "abc123"
    }
    }
    };

    const response = await fetch(serverUrl, {
    method: 'POST',
    headers: {
    'Content-Type': 'application/json',
    'Mcp-Session-Id': sessionId
    },
    body: JSON.stringify(messageData)
    });

    if (response.ok) {
    const data = await response.json();
    displayMessage(`Response: ${JSON.stringify(data)}`);
    } else {
    displayMessage(`Message sending failed: ${response.statusText}`);
    }
    });

    // 启动 SSE 流
    function startSSE() {
    if (!sessionId) {
    displayMessage("Please initialize first.");
    return;
    }

    if (eventSource) {
    eventSource.close();
    }

    eventSource = new EventSource(`${serverUrl}?Mcp-Session-Id=${sessionId}`);
    eventSource.onmessage = (event) => {
    displayMessage(`SSE Message: ${event.data}`);
    };

    eventSource.onerror = (error) => {
    displayMessage(`SSE Error: ${error.message}`);
    eventSource.close();
    };
    }

    // 显示消息
    function displayMessage(message) {
    const messagesDiv = document.getElementById('messages');
    const messageElement = document.createElement('div');
    messageElement.className = 'message';
    messageElement.textContent = message;
    messagesDiv.appendChild(messageElement);
    }
    </script>
    </body>
    </html>

    前端说明

  • 初始化按钮点击事件:

    • 发送初始化请求,获取 Mcp-Session-Id。
    • 将 Mcp-Session-Id 保存在变量中,用于后续请求。
  • 发送消息按钮点击事件:

    • 在请求头中包含 Mcp-Session-Id,发送普通请求并获取响应。
  • 监听 SSE 流:

    • 使用 EventSource 监听服务器发送的 SSE 流。
    • 每当接收到新的消息时,调用 displayMessage 函数动态展示消息。
  • 显示消息:

    • 动态创建一个新的 div 元素,并将其添加到页面的 messages 区域。
  • 5. 运行步骤

    启动服务器

    将服务器端代码保存为 server.py,然后运行以下命令启动服务器:

    uvicorn server:app –reload

    运行 Python 客户端

    将 Python 客户端代码保存为 client.py,然后运行以下命令启动客户端:

    python client.py

    运行效果:

    (.venv) (base) ➜ pop git:(main) ✗ python client.py
    Session ID: 587bb6ad-08f5-4102-8b27-4c276e9d7815
    Response: {'jsonrpc': '2.0', 'id': 2, 'result': {'result': 'Request processed'}}
    Listening for SSE messages...
    Mcp-Session-Id 587bb6ad-08f5-4102-8b27-4c276e9d7815
    SSE Message: data: Message 1
    SSE Message: data: Message 2
    SSE Message: data: Message 3
    SSE Message: data: Message 4
    SSE Message: data: Message 5

    运行前端页面

    将前端代码保存为 index.html,然后在浏览器中打开该文件。

    操作步骤

  • 点击“Initialize”按钮,初始化会话并获取 Mcp-Session-Id。
  • 点击“Send Message”按钮,发送普通请求并查看服务器的响应。
  • 页面会自动监听 SSE 流,并实时显示服务器发送的实时消息。
  • 运行效果: 在这里插入图片描述

    6. 调试

    如果遇到问题,可以使用以下方法进行调试:

  • 检查服务器日志:

    • 查看服务器日志,确认是否生成了 Mcp-Session-Id 并返回给客户端。
    • 确认 StreamingResponse 是否正确返回了流式数据。
  • 检查浏览器开发者工具:

    • 打开浏览器的开发者工具(F12),查看网络请求的响应头,确认是否包含 Mcp-Session-Id。
    • 查看 EventSource 的网络请求,确认是否正确接收到了流式数据。
  • 检查跨域问题:

    • 确保服务器正确配置了 CORS,允许前端页面的域名和端口。
    • 确保 expose_headers 配置项正确,允许客户端访问自定义头 Mcp-Session-Id。
  • 通过以上步骤,你可以实现一个支持 Streamable HTTP 的 MCP 服务器,并通过 Python 客户端和前端页面与服务器进行交互。希望这篇文章对你有所帮助

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 实现一个支持 Streamable HTTP 的 MCP 服务器及客户端
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!