一、MCP Server原理
1、MCP 的设计初衷和架构要点:
MCP(Model Context Protocol)是由 Anthropic 提出的一个协议,主要目的是让 AI 模型能够安全地访问外部上下文数据,而不需要 AI 模型直接接触用户的敏感本地系统。其关键目标是:
-
将AI 模型运行(第三方AI)与本地数据访问隔离开来
-
通过一种协议,让 AI 模型可以“请求”数据,而不是“拥有”数据访问权限
2、核心组件理解:
MCP Client(客户端) | 通常运行在第三方 AI 平台(如 Claude、ChatGPT、LangChain Agent 等)内部,用来发送数据访问请求 |
MCP Server(服务器) | 通常运行在用户本地(或企业内网),用于接收请求并访问本地资源,如数据库、文件系统、API 等,然后把结果返回给 MCP 客户端 |
模型本体(如 Claude) | 并不直接访问本地资源,而是通过 MCP 客户端转发请求 |
3、MCP Server 部署建议
-
应该部署在本地端(本地机 / 内网服务器)
-
不应暴露给公网第三方 AI
-
可以配置访问策略、身份认证、日志等功能
4、示例应用场景
企业知识问答 | Claude 接入 MCP,访问公司内网文档库生成回答 |
本地文件问答 | ChatGPT 插件通过 MCP 请求读取本地 PDF |
本地数据库分析 | LangChain Agent 经由 MCP 调用 SQL 查询本地 MySQL 数据 |
5、MCP Server 本质是什么?
MCP Server 其实就是一个符合 MCP 协议规范 的 Web Server,本质上是一个能接收 JSON 请求、按需访问本地资源(文件、数据库、API 等)并返回结果的 REST API 服务。
它的特点是:
-
遵循 MCP 协议的请求格式和响应格式
-
支持身份认证(可选)
-
执行“受控”的本地操作,比如读取文件、搜索数据库、调用脚本等
6、如何自己部署本地 MCP Server?
可以选择直接使用已有项目部署,也可以根据需要从零构建。
方案一:快速部署开源项目
以 robvdl/mcp-server 为例:
# 克隆项目
git clone https://github.com/robvdl/mcp-server.git
cd mcp-server
# 创建虚拟环境并安装依赖
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# 启动服务(默认在 localhost:3333)
python server.py
MCP Server 会监听请求,比如:
POST http://localhost:3333/context/v1/query
{
"input": "请读取 mynotes.txt",
"context": {
"resources": ["local:files"],
"tools": ["read_file"]
}
}
返回类似:
{
"output": "这里是 mynotes.txt 的内容:…"
}
方案二:用 Python FastAPI 自己写一个最小 MCP Server
from fastapi import FastAPI, Request
from pydantic import BaseModel
import os
app = FastAPI()
class QueryRequest(BaseModel):
input: str
context: dict
@app.post("/context/v1/query")
async def query(req: QueryRequest):
if "read_file" in req.context.get("tools", []):
filename = req.input.strip()
try:
with open(filename, "r") as f:
content = f.read()
return {"output": content}
except Exception as e:
return {"output": f"读取失败: {e}"}
else:
return {"output": "未知工具"}
运行:
uvicorn mcp_server:app –host 0.0.0.0 –port 3333
这就有了一个最简本地 MCP Server,可以配合 Claude / LangChain 使用。
二、示例:企业级 MCP Server 原型
1、目标资源整合清单:
📊 本地数据库 | MySQL / PostgreSQL / SQLite,支持 SQL 查询 |
📁 IBM ECM | 如 FileNet P8 / CM8,使用 API 访问文档元数据和内容 |
🗃️ NFS 文件系统 | 通过挂载路径直接读取文件(支持 PDF / 文本 / Office) |
🌀 IBM BPM | 通过 REST API 调用流程实例、获取任务状态等 |
🪟 Windows 工作组共享文件夹 | 通过 smbclient / smbprotocol 访问 SMB/CIFS 共享资源 |
1.架构建议
我们可以构建如下结构的 MCP Server:
Claude / LangChain / AI Agent
↓
MCP Client
↓
🔐 本地 MCP Server
├── 📊 db_adapter.py (数据库访问)
├── 📁 ecm_adapter.py (IBM ECM)
├── 📂 nfs_adapter.py (本地/NFS 文件访问)
├── 🌀 bpm_adapter.py (IBM BPM)
└── 🪟 smb_adapter.py (Windows 文件共享)
每一个模块我们都用 Python 实现,统一暴露 REST 接口,比如 /context/v1/query,然后根据 input 和 context.tools 分发到对应 adapter。
2.部署方式建议
-
✅ 使用 FastAPI 构建主服务(轻量、高性能)
-
✅ 各模块解耦(易扩展)
-
✅ 支持 Docker 化部署(可选)
-
✅ 支持身份认证(Token 或 API 密钥)
3.示例功能调用
请求:
{
"input": "select * from customers where city = 'Shanghai';",
"context": {
"tools": ["query_database"],
"params": {
"db_name": "crm",
"db_type": "mysql"
}
}
}
响应:
{
"output": "结果:客户总数 5,分别为:张三、李四…"
}
2、最小MCP Server原型
最小可运行的 MCP Server 原型,支持:
✅ 通过 HTTP 接口接收 SQL 查询请求 ✅ 执行本地数据库(支持 SQLite 或 MySQL)查询 ✅ 返回结构化查询结果
项目结构设计
mcp_server/
├── main.py # FastAPI 主服务
├── db_adapter.py # 数据库访问模块
├── requirements.txt # 依赖列表
└── config.py # 数据库连接配置
1. requirements.txt
fastapi
uvicorn
sqlalchemy
pymysql # 如果使用 MySQL
2. config.py(数据库连接配置)
# config.py
DATABASES = {
"sqlite": {
"url": "sqlite:///./test.db"
},
"mysql": {
"url": "mysql+pymysql://user:password@localhost:3306/yourdb"
}
}
3. db_adapter.py
# db_adapter.py
from sqlalchemy import create_engine, text
from config import DATABASES
def run_query(db_type: str, query: str):
if db_type not in DATABASES:
return f"不支持的数据库类型:{db_type}"
engine = create_engine(DATABASES[db_type]["url"])
try:
with engine.connect() as conn:
result = conn.execute(text(query))
rows = [dict(row._mapping) for row in result]
return rows
except Exception as e:
return f"查询失败:{e}"
4. main.py
# main.py
from fastapi import FastAPI, Request
from pydantic import BaseModel
from db_adapter import run_query
app = FastAPI()
class MCPRequest(BaseModel):
input: str
context: dict
@app.post("/context/v1/query")
async def handle_query(req: MCPRequest):
tools = req.context.get("tools", [])
params = req.context.get("params", {})
if "query_database" in tools:
db_type = params.get("db_type", "sqlite")
query = req.input
result = run_query(db_type, query)
return {"output": result}
else:
return {"output": "不支持的工具调用"}
5.启动 MCP Server
uvicorn main:app –reload –port 3333
这就有了一个运行在 http://localhost:3333/context/v1/query 的 MCP Server,可以处理 SQL 请求了。
6.测试 POST 请求(用 Postman 或 curl)
curl -X POST http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-d '{
"input": "SELECT * FROM customers LIMIT 5",
"context": {
"tools": ["query_database"],
"params": {
"db_type": "sqlite"
}
}
}'
3、部署MCP Server 的 NFS 文件访问模块
构建 MCP Server 的 NFS 文件访问模块,它将支持:
✅ 读取 NFS 挂载目录中的文件(文本、Markdown、PDF、Word 等) ✅ 通过 MCP 请求按路径访问文件内容 ✅ 返回内容片段用于 AI 理解(可分页 / 截断)
项目结构更新
mcp_server/
├── main.py # FastAPI 主服务
├── db_adapter.py # 数据库访问模块
├── file_adapter.py # NFS 文件访问模块 ✅
├── requirements.txt
├── config.py
1. 配置 NFS 路径(config.py)
# config.py
NFS_MOUNT_ROOT = "/mnt/nfs_shared" # 你的 NFS 挂载目录(例如通过 /etc/fstab 或 autofs 已挂载)
2. 实现 file_adapter.py
# file_adapter.py
import os
from config import NFS_MOUNT_ROOT
def read_file_from_nfs(rel_path: str, max_bytes=5000):
safe_path = os.path.normpath(os.path.join(NFS_MOUNT_ROOT, rel_path))
# 防止路径逃逸攻击
if not safe_path.startswith(NFS_MOUNT_ROOT):
return "非法路径"
if not os.path.exists(safe_path):
return f"文件不存在:{rel_path}"
try:
with open(safe_path, "rb") as f:
content = f.read(max_bytes)
try:
return content.decode("utf-8")
except UnicodeDecodeError:
return "文件不是 UTF-8 编码文本,无法解析"
except Exception as e:
return f"读取文件失败:{e}"
3. 修改 main.py 支持 read_nfs_file
# main.py(新增部分)
from file_adapter import read_file_from_nfs
@app.post("/context/v1/query")
async def handle_query(req: MCPRequest):
tools = req.context.get("tools", [])
params = req.context.get("params", {})
query = req.input
if "query_database" in tools:
db_type = params.get("db_type", "sqlite")
result = run_query(db_type, query)
return {"output": result}
elif "read_nfs_file" in tools:
file_path = query.strip() # 例如: "project/info.md"
result = read_file_from_nfs(file_path)
return {"output": result}
return {"output": "不支持的工具调用"}
4.示例请求(curl)
curl -X POST http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-d '{
"input": "documents/meeting_notes.txt",
"context": {
"tools": ["read_nfs_file"]
}
}'
确保已将 documents/meeting_notes.txt 放在 /mnt/nfs_shared/documents/meeting_notes.txt,即可读取内容。
4、加入PDF文件提供功能
加入 PDF 文件提取功能 是 MCP 读取企业文档非常关键的一步。我们将:
✅ 在原来的 read_file_from_nfs 基础上扩展 ✅ 自动判断文件类型,如果是 PDF,则提取文本内容返回
1. 安装 PDF 提取依赖
使用 pdfplumber,它能保留 PDF 中的段落结构,比 PyPDF2 更适合做内容理解。
pip install pdfplumber
可以把它加到 requirements.txt 里:
pdfplumber
2. 更新 file_adapter.py
# file_adapter.py
import os
import pdfplumber
from config import NFS_MOUNT_ROOT
def read_file_from_nfs(rel_path: str, max_chars=5000):
safe_path = os.path.normpath(os.path.join(NFS_MOUNT_ROOT, rel_path))
if not safe_path.startswith(NFS_MOUNT_ROOT):
return "非法路径"
if not os.path.exists(safe_path):
return f"文件不存在:{rel_path}"
# 判断文件类型
if rel_path.lower().endswith(".pdf"):
return extract_pdf_text(safe_path, max_chars)
else:
try:
with open(safe_path, "r", encoding="utf-8") as f:
return f.read(max_chars)
except UnicodeDecodeError:
return "文件不是 UTF-8 编码文本,无法解析"
except Exception as e:
return f"读取失败:{e}"
def extract_pdf_text(file_path: str, max_chars=5000):
try:
with pdfplumber.open(file_path) as pdf:
text = ''
for page in pdf.pages:
text += page.extract_text() or ''
if len(text) >= max_chars:
break
return text[:max_chars] or "PDF 提取内容为空"
except Exception as e:
return f"PDF 解析失败:{e}"
3.示例:读取 PDF
curl -X POST http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-d '{
"input": "reports/summary_2023.pdf",
"context": {
"tools": ["read_nfs_file"]
}
}'
只要把 summary_2023.pdf 放到 /mnt/nfs_shared/reports/ 下,系统就会自动解析 PDF 并返回前若干字符的文本内容。
4.可选增强功能
✅ 限制页面数 | 只解析前 N 页,提高速度 |
✅ 支持提取表格 | 使用 pdfplumber 的 extract_table() |
✅ 多语言支持 | 基于内容做 OCR 或语言标识 |
5、 IBM BPM / ECM 接入模块
接下来构建 MCP Server 的 IBM BPM + IBM ECM 接入模块,目标是:
✅ 通过统一接口读取 IBM BPM 流程信息(流程状态、当前节点、处理人等) ✅ 通过统一接口读取 IBM ECM 文件内容 / 元数据(如基于 FileNet) ✅ 封装为 MCP 的工具调用模块,供 AI 多智能体读取业务状态、内容材料等
1.设计概览
模块我们将分两个部分:
bpm_adapter.py | 查询流程状态、任务节点、处理人等 | 通过 IBM BPM REST API |
ecm_adapter.py | 查询文件元信息、预览文本内容 | 通过 FileNet Content Engine 或 IBM CMIS API |
2.MCP Server 项目结构更新
mcp_server/
├── main.py
├── config.py
├── db_adapter.py
├── file_adapter.py
├── bpm_adapter.py ✅ IBM BPM 访问模块
├── ecm_adapter.py ✅ IBM ECM 访问模块
├── auth.py
├── requirements.txt
3.IBM BPM 接入模块示例(bpm_adapter.py)
假设 BPM 使用 Business Automation Workflow,支持 REST 接口
# bpm_adapter.py
import requests
from config import BPM_API_BASE, BPM_AUTH
def get_bpm_process_info(process_id: str):
try:
url = f"{BPM_API_BASE}/rest/bpm/wle/v1/process/{process_id}"
headers = {"Authorization": f"Basic {BPM_AUTH}"}
resp = requests.get(url, headers=headers, timeout=10)
data = resp.json()
if data.get("status") != "200":
return f"流程查询失败:{data.get('statusReason')}"
instance = data["data"]["processInstance"]
return {
"流程ID": instance["piid"],
"状态": instance["executionState"],
"流程图": instance["bpdName"],
"当前环节": instance["activities"]
}
except Exception as e:
return f"连接 BPM 失败:{e}"
4.IBM ECM 接入模块示例(ecm_adapter.py)
假设 ECM 使用 FileNet P8,支持 REST 或 CMIS 接口
# ecm_adapter.py
import requests
from config import ECM_API_BASE, ECM_AUTH
def get_document_metadata(doc_id: str):
try:
url = f"{ECM_API_BASE}/wsi/FNCEWS40MTOM/"
headers = {
"Authorization": f"Basic {ECM_AUTH}",
"Content-Type": "application/soap+xml"
}
# 实际调用中这里要写 SOAP 请求体或 CMIS 请求体,这里简化:
# 用 CMIS 的话可直接用 atom/xml 读取
# 可替换为 requests.get(…)
# 示例简化返回
return {
"文档ID": doc_id,
"文件名": "销售合同.docx",
"创建人": "张三",
"创建时间": "2024-11-02",
"标签": ["合同", "客户A"]
}
except Exception as e:
return f"连接 ECM 失败:{e}"
5. MCP Server 扩展工具接口(main.py)
from bpm_adapter import get_bpm_process_info
from ecm_adapter import get_document_metadata
@app.post("/context/v1/query")
async def handle_query(
req: MCPRequest,
request: Request,
auth: None = Depends(verify_api_key)
):
tools = req.context.get("tools", [])
query = req.input.strip()
if "read_nfs_file" in tools:
return {"output": read_file_from_nfs(query)}
elif "query_bpm" in tools:
return {"output": get_bpm_process_info(query)}
elif "query_ecm" in tools:
return {"output": get_document_metadata(query)}
return {"output": "不支持的工具调用"}
6.示例调用
查询 BPM 流程状态:
curl -X POST http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-H "x-api-key: abc123xyz" \\
-d '{
"input": "12345",
"context": {
"tools": ["query_bpm"]
}
}'
查询 ECM 文档元数据:
curl -X POST http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-H "x-api-key: abc123xyz" \\
-d '{
"input": "doc-20240101001",
"context": {
"tools": ["query_ecm"]
}
}'
7、IBM ECM 中文全文提取模块
实现 IBM ECM 中文全文提取模块,支持将 ECM 中文文档内容提取出来供 AI 分析,比如用于:
-
🤖 多智能体基于文档生成报告/回答问题
-
🔎 LangChain + Claude 进行问答分析
-
📄 全文索引用于知识库构建
1.场景说明
假设你使用的是 IBM FileNet P8 或兼容 ECM 系统,提供了如下方式访问文件:
✅ REST 或 CMIS 接口可下载文档文件(PDF、Word 等)
✅ 文档以文件流形式返回,需保存/解析
✅ 中文内容提取需要:
-
PDF:用 pdfplumber / PyMuPDF
-
Word:用 python-docx
-
图片或扫描 PDF:可选 OCR:pytesseract / PaddleOCR
2.更新项目结构(新增模块)
mcp_server/
├── ecm_adapter.py ✅ ECM 中文提取逻辑
├── utils/
│ └── extract_text.py ✅ 文件类型识别 + 中文提取工具
3.文件内容提取工具(utils/extract_text.py)
# utils/extract_text.py
import os
import pdfplumber
import docx
from pathlib import Path
def extract_text_from_file(file_path: str) -> str:
suffix = Path(file_path).suffix.lower()
if suffix == ".pdf":
return extract_text_from_pdf(file_path)
elif suffix in [".doc", ".docx"]:
return extract_text_from_docx(file_path)
else:
return f"暂不支持的文件类型:{suffix}"
def extract_text_from_pdf(path):
try:
text = ""
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
text += page.extract_text() + "\\n"
return text.strip() or "未提取到文本,可能是扫描件"
except Exception as e:
return f"PDF 提取失败: {e}"
def extract_text_from_docx(path):
try:
doc = docx.Document(path)
return "\\n".join([para.text for para in doc.paragraphs]).strip()
except Exception as e:
return f"Word 提取失败: {e}"
4.更新 ECM 接口(ecm_adapter.py)
# ecm_adapter.py
import requests
import tempfile
from config import ECM_API_BASE, ECM_AUTH
from utils.extract_text import extract_text_from_file
def get_document_content(doc_id: str):
try:
# 模拟从 ECM 下载文件
url = f"{ECM_API_BASE}/download/{doc_id}"
headers = {"Authorization": f"Basic {ECM_AUTH}"}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
return f"下载失败:{response.status_code}"
# 临时保存文件
suffix = get_suffix_from_header(response)
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(response.content)
tmp_path = tmp.name
# 提取内容
return extract_text_from_file(tmp_path)
except Exception as e:
return f"获取文档失败:{e}"
def get_suffix_from_header(response):
ctype = response.headers.get("Content-Type", "")
if "pdf" in ctype:
return ".pdf"
elif "word" in ctype:
return ".docx"
else:
return ".bin"
5. 在 main.py 注册新工具
from ecm_adapter import get_document_metadata, get_document_content
# 新工具接入
elif "read_ecm_content" in tools:
return {"output": get_document_content(query)}
6. 示例调用:
curl http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-H "x-api-key: abc123xyz" \\
-d '{
"input": "doc-20240101-合同ABC",
"context": {
"tools": ["read_ecm_content"]
}
}'
8、ECM + BPM 联动模块构建
ECM + BPM 联动模块构建,这是实现智能化业务流程分析、文档驱动审批、Agent 自动辅助决策的核心能力。
1.目标说明:什么是 ECM + BPM 联动?
「流程 + 文档」联动 = 结合流程状态和绑定文档,构建完整业务上下文。
2.示例场景
合同审批 | 当前流程状态、处理人、节点 | 合同原文内容、创建人 | 自动生成审批摘要、智能辅助决策 |
档案归档 | 流程历史、操作记录 | 对应扫描件 | 流程 + 文件联合生成归档记录 |
投诉处理 | 投诉流程进度 | 投诉材料、聊天记录 | 多模态信息聚合、AI 报告分析 |
3.系统结构图
Claude / LangChain Agent
│
┌──────┴───────┐
│ MCP Server │
└──────┬───────┘
│
┌────────────┬─┴────────────┐
│ BPM模块 │ ECM模块 │
│ get_bpm_process_info() │
│ + get_linked_documents() │
└──────┬──────────────┬────┘
│ │
获取流程信息 获取流程绑定文档 → 提取全文
4.BPM 返回绑定文档 ID
在 bpm_adapter.py 中扩展函数,假设某些节点带有绑定文档:
def get_bpm_process_info(process_id: str):
…
instance = data["data"]["processInstance"]
linked_docs = []
for act in instance.get("activities", []):
# 假设附加属性中存放文档 ID
doc_id = act.get("attachedDocumentId")
if doc_id:
linked_docs.append(doc_id)
return {
"流程ID": instance["piid"],
"状态": instance["executionState"],
"流程图": instance["bpdName"],
"当前环节": instance["activities"],
"文档列表": linked_docs
}
5.ECM 提供内容提取
使用前面写好的 get_document_content(doc_id) 方法,依次提取这些文档。
6.联动函数:get_bpm_with_documents(process_id)
放在 mcp_server/bpm_ecm_combined.py:
# bpm_ecm_combined.py
from bpm_adapter import get_bpm_process_info
from ecm_adapter import get_document_content
def get_bpm_with_documents(process_id: str):
bpm_info = get_bpm_process_info(process_id)
if isinstance(bpm_info, str):
return f"BPM查询失败:{bpm_info}"
docs = bpm_info.get("文档列表", [])
doc_contents = []
for doc_id in docs:
content = get_document_content(doc_id)
doc_contents.append({
"文档ID": doc_id,
"内容": content[:1000] # 限制内容长度
})
return {
"流程信息": bpm_info,
"文档内容": doc_contents
}
7.在 main.py 中暴露工具调用接口
from bpm_ecm_combined import get_bpm_with_documents
@app.post("/context/v1/query")
async def handle_query(…):
…
elif "bpm_with_doc" in tools:
return {"output": get_bpm_with_documents(query)}
8.示例调用方式
curl -X POST http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-H "x-api-key: abc123xyz" \\
-d '{
"input": "流程ID: process-abc-001",
"context": {
"tools": ["bpm_with_doc"]
}
}'
9、自动生成【流程+文档】摘要并返回
自动生成【流程 + 文档】摘要并返回功能,让 Claude / GPT 等 AI Agent 在掌握流程状态和绑定文档内容后,生成一段清晰、专业的中文摘要。这是多智能体应用中的关键第一步!🚀
1.功能目标
给定一个流程 ID,系统自动:
获取流程信息(BPM)
获取绑定文档内容(ECM)
拼接上下文,调用 Claude / GPT 接口生成摘要 ✅ 最终返回结构化的摘要文本(可用于审批、归档、展示等)
2. 架构更新(新增 AI 摘要模块)
mcp_server/
├── bpm_adapter.py
├── ecm_adapter.py
├── bpm_ecm_combined.py
├── summarizer.py ✅ 新增:自动摘要模块
3. 构建摘要生成模块(summarizer.py)
# summarizer.py
import os
import openai # 也可接 Claude API 或本地 LLM
openai.api_key = os.getenv("OPENAI_API_KEY")
def generate_summary(bpm_info: dict, doc_contents: list) -> str:
# 拼接上下文内容
context_text = "【流程信息】\\n"
context_text += f"流程名称:{bpm_info['流程图']}\\n"
context_text += f"当前状态:{bpm_info['状态']}\\n"
context_text += "流程节点:\\n"
for act in bpm_info["当前环节"]:
context_text += f" – {act['name']}(状态:{act['state']})\\n"
context_text += "\\n【文档摘要】\\n"
for doc in doc_contents:
context_text += f"\\n文档 ID:{doc['文档ID']}\\n"
context_text += doc["内容"][:1000] + "\\n" # 控制长度
# 定义 Prompt
prompt = f"""你是企业流程文档专家,请阅读下列信息,输出一段中文摘要,要求重点清晰、表达简练:
{context_text}
==>
请用中文输出摘要:"""
# 调用 OpenAI API
try:
response = openai.ChatCompletion.create(
model="gpt-4", # 可改为 gpt-3.5 / Claude / 本地 LLM
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=512,
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"AI 生成摘要失败:{e}"
4.联动调用(在 bpm_ecm_combined.py 中封装)
# bpm_ecm_combined.py
from summarizer import generate_summary
def get_bpm_with_summary(process_id: str):
bpm_info = get_bpm_process_info(process_id)
if isinstance(bpm_info, str):
return f"BPM查询失败:{bpm_info}"
docs = bpm_info.get("文档列表", [])
doc_contents = []
for doc_id in docs:
content = get_document_content(doc_id)
doc_contents.append({"文档ID": doc_id, "内容": content})
summary = generate_summary(bpm_info, doc_contents)
return {
"流程ID": process_id,
"摘要": summary,
"原始流程信息": bpm_info,
"文档内容预览": [{ "文档ID": d["文档ID"], "前几百字": d["内容"][:300] } for d in doc_contents]
}
5.API 接入 main.py
from bpm_ecm_combined import get_bpm_with_summary
elif "bpm_doc_summary" in tools:
return {"output": get_bpm_with_summary(query)}
6.测试调用(API 示例)
curl http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-H "x-api-key: abc123xyz" \\
-d '{
"input": "process-20240318-A合同审批",
"context": {
"tools": ["bpm_doc_summary"]
}
}'
7.返回示例结构
{
"流程ID": "process-20240318-A合同审批",
"摘要": "本流程为A公司与B公司采购合同审批流程,当前由法务部进行审阅。合同文本已由销售部上传,内容涉及年框协议及付款条款。需重点关注付款周期与违约责任。",
"原始流程信息": {…},
"文档内容预览": [
{ "文档ID": "doc-abc", "前几百字": "合同编号:2024-A001…" }
]
}
10、自动给出审批建议
让 Claude / GPT 基于流程和文档内容,自动给出审批建议,比如:
✅「建议通过审批,理由是……」 ❌「建议退回,因文档缺少……」 🟡「需补充说明……」
这就让 AI 真正参与到业务判断中,帮助用户加快流程、提升决策质量。下面是实现方式👇
1.目标功能:自动给出审批建议
-
输入:流程 ID
-
系统自动获取流程信息 + 文档内容
-
AI 输出:
-
是否建议通过(通过 / 不通过 / 需补充)
-
建议理由(引用文档、流程等)
2.模块结构更新
mcp_server/
├── bpm_adapter.py
├── ecm_adapter.py
├── summarizer.py
├── approval_advisor.py ✅ 新增:审批建议模块
├── bpm_ecm_combined.py
3.构建 approval_advisor.py
# approval_advisor.py
import os
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
def generate_approval_advice(bpm_info: dict, doc_contents: list) -> str:
context = "【流程信息】\\n"
context += f"流程名称:{bpm_info['流程图']}\\n当前状态:{bpm_info['状态']}\\n"
context += "流程节点:\\n"
for act in bpm_info["当前环节"]:
context += f" – {act['name']}(状态:{act['state']})\\n"
context += "\\n【文档内容提要】\\n"
for doc in doc_contents:
context += f"\\n文档ID:{doc['文档ID']}\\n"
context += doc["内容"][:1000] + "\\n"
prompt = f"""你是企业流程审批专家,请根据以下信息,判断该流程是否可以审批通过。请严格评估,并输出如下结构:
1. 是否建议通过审批(通过 / 不通过 / 需补充)
2. 建议理由(结合流程信息与文档内容,简明清晰)
以下是信息内容:
{context}
==>
请输出审批建议:"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=512,
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"AI 生成审批建议失败:{e}"
4.封装函数 get_bpm_with_advice
更新 bpm_ecm_combined.py:
from approval_advisor import generate_approval_advice
def get_bpm_with_advice(process_id: str):
bpm_info = get_bpm_process_info(process_id)
if isinstance(bpm_info, str):
return f"BPM查询失败:{bpm_info}"
docs = bpm_info.get("文档列表", [])
doc_contents = []
for doc_id in docs:
content = get_document_content(doc_id)
doc_contents.append({"文档ID": doc_id, "内容": content})
advice = generate_approval_advice(bpm_info, doc_contents)
return {
"流程ID": process_id,
"AI审批建议": advice,
"原始流程信息": bpm_info,
"文档预览": [{ "文档ID": d["文档ID"], "前几百字": d["内容"][:300] } for d in doc_contents]
}
5.在 main.py 中暴露 API
from bpm_ecm_combined import get_bpm_with_advice
elif "bpm_doc_advice" in tools:
return {"output": get_bpm_with_advice(query)}
6.测试调用(API 示例)
curl http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-H "x-api-key: abc123xyz" \\
-d '{
"input": "process-20240318-A合同审批",
"context": {
"tools": ["bpm_doc_advice"]
}
}'
7.返回结构示例
{
"流程ID": "process-20240318-A合同审批",
"AI审批建议": "1. 建议通过审批\\n2. 理由:流程已完成关键节点审核,且文档中已明确付款条款、违约责任,无明显遗漏。",
"原始流程信息": { … },
"文档预览": [
{ "文档ID": "doc-abc", "前几百字": "合同编号:2024-A001…" }
]
}
11、增加法务Agent,专门分析合同条款的法律风险
法务 Agent,让它专注分析合同条款的法律风险、合规性问题和关键注意事项,以 AI 法律顾问的身份提供专业判断,辅助业务审批流程。
1.法务 Agent 功能定位
🎯 聚焦分析 | 只分析合同中的法律条款内容 |
🛡️ 风险识别 | 识别如违约责任模糊、付款方式不合规、缺少争议解决条款等问题 |
⚖️ 合规建议 | 给出修正建议或标注缺失项 |
✅ 输出格式 | 风险等级(高/中/低)+ 问题摘要 + 修改建议 |
2.架构更新
mcp_server/
├── bpm_adapter.py
├── ecm_adapter.py
├── approval_advisor.py
├── summarizer.py
├── legal_advisor.py ✅ 新增:法务 Agent 模块
├── bpm_ecm_combined.py
3.新增 legal_advisor.py
# legal_advisor.py
import os
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
def analyze_legal_risks(doc_text: str) -> str:
prompt = f"""
你是企业合同法务专家,请你阅读以下合同条款内容,从法律合规角度进行风险分析。请识别以下问题并给出建议:
– 是否缺少关键法律条款(如违约责任、争议解决、适用法律)
– 是否存在模糊、风险性描述
– 是否条款之间存在冲突或遗漏
请按如下结构输出:
1. 风险等级(高 / 中 / 低)
2. 风险要点(列举存在的问题)
3. 修改建议
合同内容如下:
{doc_text[:3000]} # 控制上下文长度
"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=512,
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"法务风险分析失败:{e}"
4.组合调用(更新 bpm_ecm_combined.py)
from legal_advisor import analyze_legal_risks
def get_bpm_with_legal_advice(process_id: str):
bpm_info = get_bpm_process_info(process_id)
if isinstance(bpm_info, str):
return f"BPM查询失败:{bpm_info}"
docs = bpm_info.get("文档列表", [])
doc_contents = []
for doc_id in docs:
content = get_document_content(doc_id)
doc_contents.append({"文档ID": doc_id, "内容": content})
legal_reports = []
for doc in doc_contents:
if "合同" in doc["文档ID"] or "agreement" in doc["文档ID"].lower(): # 简单判断是否为合同
report = analyze_legal_risks(doc["内容"])
legal_reports.append({
"文档ID": doc["文档ID"],
"法律风险分析": report
})
return {
"流程ID": process_id,
"法律风险报告": legal_reports,
"文档预览": [{ "文档ID": d["文档ID"], "前几百字": d["内容"][:300] } for d in doc_contents]
}
5. API 接入(main.py)
from bpm_ecm_combined import get_bpm_with_legal_advice
elif "legal_risk_analysis" in tools:
return {"output": get_bpm_with_legal_advice(query)}
6.测试调用示例
curl http://localhost:3333/context/v1/query \\
-H "Content-Type: application/json" \\
-H "x-api-key: abc123xyz" \\
-d '{
"input": "process-20240318-A合同审批",
"context": {
"tools": ["legal_risk_analysis"]
}
}'
7.返回结构示例
{
"流程ID": "process-20240318-A合同审批",
"法律风险报告": [
{
"文档ID": "A公司与B公司采购合同",
"法律风险分析": "1. 风险等级:中\\n2. 风险要点:未包含争议解决条款,违约责任表述模糊。\\n3. 建议:补充争议解决条款,明确赔偿范围。"
}
],
"文档预览": [
{ "文档ID": "A公司与B公司采购合同", "前几百字": "合同编号:2024-A001…" }
]
}
12、法务辅助 + 自动修订 + BPM联动
接下来实现以下两个功能,打造真正具备“法务辅助 + 自动修订 + BPM联动”的智能流程系统:
功能一:将法务分析结果写入 BPM 流程备注字段
1. 实现逻辑
在 get_bpm_with_legal_advice 函数中分析法律风险
将分析结果用接口写入 BPM 系统中对应流程的备注字段(或评论、审阅日志)
2.BPM Adapter 增加接口(假设 BPM 支持 REST)
更新 bpm_adapter.py:
def write_bpm_comment(process_id: str, comment: str) -> bool:
try:
# 示例:假设我们调用本地 BPM API
url = f"http://localhost:8080/bpm/api/processes/{process_id}/comment"
response = requests.post(url, json={"comment": comment})
return response.status_code == 200
except Exception as e:
print(f"BPM写入备注失败:{e}")
return False
3.更新 get_bpm_with_legal_advice
def get_bpm_with_legal_advice(process_id: str):
bpm_info = get_bpm_process_info(process_id)
if isinstance(bpm_info, str):
return f"BPM查询失败:{bpm_info}"
docs = bpm_info.get("文档列表", [])
doc_contents = []
legal_reports = []
for doc_id in docs:
content = get_document_content(doc_id)
doc_contents.append({"文档ID": doc_id, "内容": content})
if "合同" in doc_id or "agreement" in doc_id.lower():
report = analyze_legal_risks(content)
legal_reports.append({"文档ID": doc_id, "法律风险分析": report})
# 组织写入内容
summary = "\\n\\n".join(
f"📄【{r['文档ID']}】\\n{r['法律风险分析']}" for r in legal_reports
)
comment = f"🛡️ 法务风险分析如下:\\n{summary}"
write_bpm_comment(process_id, comment)
return {
"流程ID": process_id,
"法律风险报告": legal_reports,
"备注已写入BPM": True,
}
功能二:AI 自动修订合同草稿
我们让 Claude 或 GPT 直接给出修改建议或返回优化后的合同草稿段落。
1.模块新增:contract_editor.py
# contract_editor.py
import os
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
def revise_contract(contract_text: str, legal_issues: str) -> str:
prompt = f"""
你是法律语言编辑专家。请根据以下法律风险点,优化修订这份合同内容,使其更加合规、清晰。不要过度润色,只做法律必要修正。
【法律问题】:
{legal_issues}
【原始合同片段】:
{contract_text[:3000]}
请返回优化后的修订版合同文本:
"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=800,
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"合同修订失败:{e}"
2.更新组合模块(如 bpm_ecm_combined.py)
加入修订建议功能:
from contract_editor import revise_contract
def get_legal_advice_with_rewrite(process_id: str):
bpm_info = get_bpm_process_info(process_id)
docs = bpm_info.get("文档列表", [])
legal_reports = []
revised_docs = []
for doc_id in docs:
content = get_document_content(doc_id)
if "合同" in doc_id or "agreement" in doc_id.lower():
legal_analysis = analyze_legal_risks(content)
legal_reports.append({
"文档ID": doc_id,
"法律风险分析": legal_analysis
})
revised_text = revise_contract(content, legal_analysis)
revised_docs.append({
"文档ID": doc_id,
"修订建议": revised_text
})
return {
"流程ID": process_id,
"法律风险报告": legal_reports,
"合同修订建议": revised_docs
}
3.API 调用方式
POST /query
{
"input": "process-20240318-A合同审批",
"context": {
"tools": ["legal_risk_analysis", "rewrite_contract"]
}
}
4.最终返回结构示例
{
"流程ID": "process-20240318-A合同审批",
"法律风险报告": [
{
"文档ID": "采购合同-A001",
"法律风险分析": "1. 风险等级:中\\n2. 问题:缺少违约赔偿机制\\n3. 建议:加入赔偿条款"
}
],
"合同修订建议": [
{
"文档ID": "采购合同-A001",
"修订建议": "…本协议第六条新增如下内容:若任何一方违约,应赔偿对方因此造成的实际经济损失…"
}
]
}
总结
✅ 法务风险识别 | 完成 |
✅ 风险写入 BPM 流程备注 | 完成 |
✅ AI 自动修订合同草稿 | 完成 |
✅ 返回结构化风险 + 建议 | 已完成 |
13、多 Agent 投票机制
多 Agent 投票机制,模拟多个法律专家对合同条款进行独立判断 + 投票表决,确保结论更具权威性和稳健性。
1.什么是多 Agent 投票机制?
多个大模型(Agent),独立分析合同风险,然后:
各自输出判断(风险等级、高亮问题、建议)
系统自动对比意见,归纳共识
若冲突,则输出“⚠️ 存在分歧,请人工确认”
2.投票架构设计
┌────────────┐
│ Contract │
│ Text │
└────┬───────┘
│
┌───────────────┼────────────────┐
▼ ▼ ▼
Claude Agent GPT Agent Mistral Agent
(风险分析A) (风险分析B) (风险分析C)
└───────────────┼────────────────┘
▼
🤖 合并 & 投票判断模块
▼
📝 最终风险共识 + 分歧标记
3.步骤一:创建多模型分析函数
我们新增 multi_agent_legal.py 模块:
# multi_agent_legal.py
from legal_advisor import analyze_legal_risks_with_model
def multi_agent_vote(contract_text: str):
models = {
"claude": "claude-3-opus-20240229",
"gpt": "gpt-4",
"mistral": "mistral-7b-instruct"
}
results = {}
for name, model in models.items():
result = analyze_legal_risks_with_model(contract_text, model)
results[name] = result
# 简单投票逻辑(可扩展为 NLP 共识判断)
risk_levels = {}
for name, text in results.items():
if "高" in text:
risk_levels[name] = "高"
elif "中" in text:
risk_levels[name] = "中"
else:
risk_levels[name] = "低"
votes = list(risk_levels.values())
if votes.count("高") >= 2:
final_level = "高"
elif votes.count("中") >= 2:
final_level = "中"
else:
final_level = "低"
consensus = {
"最终风险等级": final_level,
"各Agent判断": risk_levels,
"详细报告": results
}
return consensus
4.步骤二:支持不同模型调用(修改 legal_advisor.py)
def analyze_legal_risks_with_model(doc_text: str, model: str) -> str:
prompt = f"""
你是合同法律审查专家,请分析以下合同段落的法律风险、缺陷,并提出修改建议。
【合同内容】:
{doc_text[:3000]}
"""
try:
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=512,
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"[{model}] 风险分析失败:{e}"
5.步骤三:在流程中引入投票分析
更新组合模块,如:
from multi_agent_legal import multi_agent_vote
def get_legal_vote_result(process_id: str):
bpm_info = get_bpm_process_info(process_id)
docs = bpm_info.get("文档列表", [])
vote_reports = []
for doc_id in docs:
content = get_document_content(doc_id)
if "合同" in doc_id or "agreement" in doc_id.lower():
vote_result = multi_agent_vote(content)
vote_reports.append({
"文档ID": doc_id,
"最终风险等级": vote_result["最终风险等级"],
"多Agent判断": vote_result["各Agent判断"],
"详细报告": vote_result["详细报告"]
})
return {
"流程ID": process_id,
"合同多Agent风险共识报告": vote_reports
}
6.示例返回结构
{
"流程ID": "A合同审批",
"合同多Agent风险共识报告": [
{
"文档ID": "采购合同-A001",
"最终风险等级": "中",
"多Agent判断": {
"claude": "中",
"gpt": "中",
"mistral": "低"
},
"详细报告": {
"claude": "存在付款条款模糊等问题",
"gpt": "未指明争议解决方式,建议补充",
"mistral": "整体风险较低,仅建议添加付款节点"
}
}
]
}
14、功能增强
功能 1:在 ECM 中创建“法律风险审查记录”文档
模块新增:ecm_writer.py
# ecm_writer.py
import requests
def create_legal_review_doc(doc_id: str, vote_result: dict) -> bool:
payload = {
"title": f"法律风险分析报告 – {doc_id}",
"metadata": {
"原文档ID": doc_id,
"风险等级": vote_result["最终风险等级"],
"多Agent判断": str(vote_result["各Agent判断"])
},
"content": vote_result["详细报告"]
}
try:
res = requests.post("http://localhost:8080/ecm/api/create_doc", json=payload)
return res.status_code == 200
except Exception as e:
print(f"ECM写入失败:{e}")
return False
功能 2:BPM 流程备注中写入投票结果摘要
复用之前的 write_bpm_comment:
def summarize_vote(vote_result: dict) -> str:
votes = vote_result["各Agent判断"]
return f"""🗳️ 法务Agent投票结果:
Claude:{votes.get('claude')}
GPT:{votes.get('gpt')}
Mistral:{votes.get('mistral')}
最终风险等级:{vote_result['最终风险等级']}"""
在流程中这样调用:
comment = summarize_vote(vote_result)
write_bpm_comment(process_id, comment)
功能 3:前端仪表盘显示投票 & 合同风险数据
使用 React + Tailwind 构建一个仪表盘(建议用 shadcn/ui)
我们来生成一个可部署的 React 页面:
// components/LegalDashboard.tsx
import { Card, CardContent } from "@/components/ui/card"
import { BarChart2, ShieldAlert } from "lucide-react"
export default function LegalDashboard({ data }) {
return (
<div className="grid gap-4 p-6 grid-cols-1 md:grid-cols-2">
{data.map((item: any) => (
<Card key={item.文档ID} className="shadow-lg rounded-2xl">
<CardContent className="p-4 space-y-2">
<div className="flex justify-between items-center">
<h2 className="text-xl font-bold">{item.文档ID}</h2>
<span className={`text-sm px-2 py-1 rounded-xl ${item.最终风险等级 === "高" ? "bg-red-100 text-red-700" : item.最终风险等级 === "中" ? "bg-yellow-100 text-yellow-800" : "bg-green-100 text-green-700"}`}>
风险:{item.最终风险等级}
</span>
</div>
<div className="text-sm">
<p><ShieldAlert className="inline mr-1" /> Claude:{item.多Agent判断.claude}</p>
<p><ShieldAlert className="inline mr-1" /> GPT:{item.多Agent判断.gpt}</p>
<p><ShieldAlert className="inline mr-1" /> Mistral:{item.多Agent判断.mistral}</p>
</div>
<details className="text-xs mt-2">
<summary className="cursor-pointer">📝 查看详细报告</summary>
<pre className="whitespace-pre-wrap mt-1">{JSON.stringify(item.详细报告, null, 2)}</pre>
</details>
</CardContent>
</Card>
))}
</div>
)
}
数据可通过 /api/legal_risks 动态获取。
总集成函数(主逻辑)
def analyze_contract_process(process_id: str):
bpm_info = get_bpm_process_info(process_id)
vote_reports = []
for doc_id in bpm_info.get("文档列表", []):
content = get_document_content(doc_id)
if "合同" in doc_id:
vote_result = multi_agent_vote(content)
# 写入 BPM
comment = summarize_vote(vote_result)
write_bpm_comment(process_id, comment)
# 写入 ECM
create_legal_review_doc(doc_id, vote_result)
vote_reports.append({
"文档ID": doc_id,
**vote_result
})
return vote_reports
最终系统架构一览
📄 文档提取 → 🧠 多Agent分析 → 🗳️ 投票归纳
│ │ │
▼ ▼ ▼
📎 ECM文档 📝 BPM备注 📊 前端仪表盘
15、增加前端数据源、邮件合规通知、把React前端部署到Docker里
步骤一:FastAPI 提供 /api/legal_risks 接口(前端数据源)
新建 FastAPI 路由 routes/legal_api.py
# routes/legal_api.py
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from backend.legal_workflow import analyze_contract_process
router = APIRouter()
@router.get("/api/legal_risks")
def get_legal_risks(process_id: str):
try:
report = analyze_contract_process(process_id)
return JSONResponse(content=report)
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
主服务引入路由(main.py)
# main.py
from fastapi import FastAPI
from routes import legal_api
app = FastAPI()
app.include_router(legal_api.router)
现在你只要访问:
http://localhost:8000/api/legal_risks?process_id=A合同审批
即可返回 JSON 报告给前端用!
步骤二:添加邮件通知合规员功能
我们添加一个通用的邮件模块 utils/email_notify.py:
# utils/email_notify.py
import smtplib
from email.mime.text import MIMEText
SMTP_SERVER = "smtp.example.com"
SMTP_PORT = 587
SMTP_USER = "your_user@example.com"
SMTP_PASS = "your_password"
def send_email(to: str, subject: str, body: str):
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = SMTP_USER
msg["To"] = to
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(SMTP_USER, to, msg.as_string())
然后在 analyze_contract_process() 中加入通知逻辑:
from utils.email_notify import send_email
# 内部流程中
send_email(
to="compliance@company.com",
subject=f"【新合同风险报告】{doc_id}",
body=f"风险等级:{vote_result['最终风险等级']}\\n请登录仪表盘查看详情。"
)
步骤三:React 仪表盘部署成 Docker 镜像
文件结构
legal-dashboard/
├── public/
├── src/
│ └── components/LegalDashboard.tsx
├── .dockerignore
├── Dockerfile
├── package.json
└── vite.config.ts
Dockerfile 示例(Dockefile)
# Step 1: Build
FROM node:20 AS builder
WORKDIR /app
COPY . .
RUN npm install && npm run build
# Step 2: Serve with nginx
FROM nginx:alpine
COPY –from=builder /app/dist /usr/share/nginx/html
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
.dockerignore(Nginx)
node_modules
dist
构建与运行
docker build -t legal-dashboard .
docker run -d -p 3000:80 legal-dashboard
然后访问:
http://localhost:3000
前端将自动访问:
http://localhost:8000/api/legal_risks?process_id=A合同审批
并展示最新的投票结果。
评论前必须登录!
注册