山东大学项目实训(2) AI响应服务器的搭建
往期文章
山东大学项目实训(1) DeepSeek API调用
文章目录
- 山东大学项目实训(2) AI响应服务器的搭建
-
- 往期文章
- 1. AI服务器的作用
- 2. 我们的项目架构
- 3. AI服务器编写
-
- AI服务器问答交互
- 多轮对话处理
- 代码编写
1. AI服务器的作用
AI服务器就是下与接受deepseek模型交互, 获取AI回答后返回到应用中的中间服务器. 其功能如下
其实从上图看出AI服务器是一个很"鸡肋"的功能, 貌似后端同学可以直接调用deepseek模型, 但是这样的情形不利于我们分工开发和我进行模型的研究, 故我暂时实现了一个AI服务器, 其在后期可能融入后端当中, 详见下部分架构的介绍.
2. 我们的项目架构
我提出了1, 2两种架构, 加上我们组员提出的一种架构共三种
这三种架构各有优劣, 我列举如下
- 方案1
- 几个优点: 开发各自独立, 很适合我们这样的小组作业, 且可扩展性高, 代码耦合度低. 适合跨语言作业, 像我们组后端使用java且AI服务使用python的情况下也可简单地使用http等通用协议调用.
- 缺点: 后期问题追踪不易, 且显而易见的效率低, 但实则不然, 其效率在以下几种情形有优势:
- 后端和服务器在一台服务器上运行, 此时二者间通信耗时很小, 当然速度还是不如方案2, 但初步考虑我们的用户需求应该够用.
- 后端性能积压, 比如要处理搜题等复杂任务, 这时将AI服务器分离到别的服务器上可以减缓后端性能压力.
- AI模型本地部署, 当我们将AI部署到本地时, AI服务器可直接承担AI模型的运作. 相当于架构方案图中的本地部署部分.
- 优点:效率较高.
- 缺点:代码耦合度高, 后续扩展困难. 在本地部署时需要后端自行处理AI的输出.
- 优点:集成了方案1的架构优势和方案2的高效率
- 缺点:前端工作量大, 且需要在数据统一处理和AI服务器不存数据间做出选择.
由于我们现在的需求较简单, 只在多用户对话的上下文处理间有些许疑问, 故三种架构均可, 我们暂定方案1. 因此下文中我都假设是后端调用AI服务器.
大模型的所有调用参数基本都可以在对话内容中给出, 其强大的语言处理能力使得用户个性化信息的传递非常容易. 例如:用户在数据结构领域答题正确率是70%, 我们只需要把这个信息告诉deepseek即可, 无需其它处理. 因此这使得AI服务器与后端的交互很简单. 换句话说, 这印证了AI服务器很"鸡肋" .
3. AI服务器编写
AI服务器问答交互
我们的大语言模型需求先定为智能问答, 智能答题和智能出题. 由于大语言模型的API参数较少, 因此我们只需设置不同的temperature和引导词即可. 引导词例如: {"role":"system", "content":"你在为用户出题, 请按照'<题目>题目内容</题目><答案>答案内容</答案>'的格式出题"} 引导词的设置可由后端或服务器端完成. 返回即为一条json对话或者流数据或者纯粹是AI回答的文本, 内容为AI的回答.
多轮对话处理
我们目前设定一个用户可以有若干个对话, 每个对话都有其唯一ID, 因此AI服务器只关注对话ID而不关注用户, 对于用户信息后端许通过参数传递或者直接放在与AI的对话中, 例如"用户在数据结构领域历史正确率为70%, 请据此出一道题目, 使其符合并能提高用户水平" . 这种个性化工作可由后端或者AI服务器完成, 我们组在后期合并阶段可由双方同学合作完成.
用户对话信息存储是个大问题. 我们暂时决定将对话记录存到用户本地, 还有一种可选方案是存到后端服务器上的数据库中.总而言之, 其大概率不会存在AI服务器本地. 因此, 我们需要在对话开始时将内容加载进AI服务器中. 考虑到服务器关断后重启等情况, 我选择处理以下两种情况:
代码编写
类的初始化, 其中save函数负责将内存中历史记录传回后端, 考虑到前/后端自主维护用户对话记录的设计, 一般用不到.
class DeepSeekChatServer:
def __init__(self):
#flask框架
self.app = Flask(__name__)
CORS(self.app)
#对话内容内存,重要
self.conversations = {} # {conversation_id: [messages]}
#api调用相关
#一般人用deepseek官网,这是我们学校的api
#记得把/v1/chat/completions部分加上
#openai的库会自动给你添加而你自己写的request不会
self.api_base = "http://学校给的api/v1/chat/completions"
self.api_key = "sk-差点忘了删" #填写你自己的key
#model类型也按官网来,这个是自己api上的类型
self.AI_model="DeepSeek-R1"
# 注册路由
self.app.add_url_rule('/chat', 'chat_endpoint', self.chat_endpoint, methods=['POST'])
self.app.add_url_rule('/save', 'save_conversation', self.save_conversation, methods=['POST'])
self.app.add_url_rule('/load', 'load_conversation', self.load_conversation, methods=['POST'])
读取历史记录与初始化对话记录
def get_or_create_conversation(self, conversation_id=None):
cid = conversation_id or str(uuid.uuid4())
if cid not in self.conversations:
self.conversations[cid] = []
return cid
def init_conversation(
self,
conversation_id: Optional[str] = None,
init_messages: Optional[List[Dict]] = None
) –> str:
"""初始化或重置对话,严格校验消息格式"""
cid = conversation_id or str(uuid.uuid4())
# 校验初始化消息
if init_messages is not None:
if not isinstance(init_messages, list):
raise ValueError("初始化消息必须是消息列表")
if not all(self._validate_message(msg) for msg in init_messages):
raise ValueError("消息格式不合法")
self.conversations[cid] = init_messages.copy() if init_messages else []
return cid
传输信息合法性检验,如果是信任的后端传来的,建议不使用以提高效率.
def _validate_message(self, message: Dict) –> bool:
required_keys = {"role", "content"}
return (
isinstance(message, dict)
and all(key in message for key in required_keys)
and message["role"] in ("user", "assistant", "system")
and isinstance(message["content"], str)
and len(message["content"].strip()) > 0
)
主要的请求处理部分, 支持流式和非流式输出, 让deepseek给我加了很多错误处理部分. 注意data, header等部分其实可以写到类的属性里以提高效率, 不用每次都在代码里生成, 但是我们的项目还没固定, 先这么写着.
def _build_headers(self):
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json"
}
def generate_response(self, conversation_id, stream=False):
messages = self.conversations.get(conversation_id, [])
data = {
"model": self.AI_model,
"messages": messages,
"temperature": 0.7,
"stream": stream
}
response = requests.post(
self.api_base,
headers=self._build_headers(),
json=data,
stream=stream
)
if response.status_code != 200:
return jsonify({"error": f"API Error: {response.text}"}), 500
if stream:
def generate():
full_content = ""
for line in response.iter_lines():
if not line:
continue
decoded_line = line.decode('utf-8')
if not decoded_line.startswith("data: "):
continue
chunk = json.loads(decoded_line[6:])
if chunk=="[DONE]" :
break
try:
if "content" in chunk["choices"][0]["delta"]:
delta = chunk["choices"][0]["delta"]["content"]
full_content += delta
yield delta
except json.JSONDecodeError:
yield "[数据解析错误]"
self.conversations[conversation_id].append({
"role": "assistant",
"content": full_content
})
return Response(generate(), mimetype='text/event-stream')
else:
content = response.json()["choices"][0]["message"]["content"]
self.conversations[conversation_id].append({
"role": "assistant",
"content": content
})
return jsonify({
"conversation_id": conversation_id,
"response": content
})
向外暴露的接口与运行, 也让deepseek添加了很多错误处理.
def chat_endpoint(self):
data = request.json
try:
# 校验基础参数
if 'message' not in data or not data['message'].strip():
return jsonify({"error": "消息内容不能为空"}), 400
# 处理对话ID
cid = self.get_or_create_conversation(data.get('conversation_id'))
# 构造消息
new_message = {
"role": "user",
"content": data['message'].strip()
}
if not self._validate_message(new_message):
return jsonify({"error": "无效的消息结构"}), 400
# 添加消息
self.conversations[cid].append(new_message)
return self.generate_response(cid, data.get('stream', False))
except KeyError:
return jsonify({"error": "请求格式错误"}), 400
def save_conversation(self):
data = request.json
cid = data['conversation_id']
return jsonify({
"conversation_id": cid,
"messages": self.conversations.get(cid, [])
})
def load_conversation(self):
data = request.json
try:
cid = self.init_conversation(
data.get('conversation_id'),
data['messages']
)
return jsonify({"conversation_id": cid}), 200
except KeyError:
return jsonify({"error": "缺少必要参数 messages"}), 400
except ValueError as e:
return jsonify({"error": str(e)}), 400
def run(self, host='127.0.0.1', port=5000, debug=True):
self.app.run(host=host, port=port, debug=debug)
运行代码
if __name__ == '__main__':
server = DeepSeekChatServer()
server.run()
完整代码
import os
from typing import Dict, List, Optional
import uuid
import json
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
import requests
class DeepSeekChatServer:
def __init__(self):
#flask框架
self.app = Flask(__name__)
CORS(self.app)
#对话内容内存,重要
self.conversations = {} # {conversation_id: [messages]}
#api调用相关
#一般人用deepseek官网,这是我们学校的api
#记得把/v1/chat/completions部分加上
#openai的库会自动给你添加而你自己写的request不会
self.api_base = "http://某个api:某个端口/v1/chat/completions"
self.api_key = "sk-又差点忘了删"
self.AI_model="DeepSeek-R1"
# 注册路由
self.app.add_url_rule('/chat', 'chat_endpoint', self.chat_endpoint, methods=['POST'])
self.app.add_url_rule('/save', 'save_conversation', self.save_conversation, methods=['POST'])
self.app.add_url_rule('/load', 'load_conversation', self.load_conversation, methods=['POST'])
def get_or_create_conversation(self, conversation_id=None):
cid = conversation_id or str(uuid.uuid4())
if cid not in self.conversations:
self.conversations[cid] = []
return cid
def init_conversation(
self,
conversation_id: Optional[str] = None,
init_messages: Optional[List[Dict]] = None
) –> str:
"""初始化或重置对话,严格校验消息格式"""
cid = conversation_id or str(uuid.uuid4())
# 校验初始化消息
if init_messages is not None:
if not isinstance(init_messages, list):
raise ValueError("初始化消息必须是消息列表")
if not all(self._validate_message(msg) for msg in init_messages):
raise ValueError("消息格式不合法")
self.conversations[cid] = init_messages.copy() if init_messages else []
return cid
def _validate_message(self, message: Dict) –> bool:
required_keys = {"role", "content"}
return (
isinstance(message, dict)
and all(key in message for key in required_keys)
and message["role"] in ("user", "assistant", "system")
and isinstance(message["content"], str)
and len(message["content"].strip()) > 0
)
def _build_headers(self):
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json"
}
def generate_response(self, conversation_id, stream=False):
messages = self.conversations.get(conversation_id, [])
data = {
"model": self.AI_model,
"messages": messages,
"temperature": 0.7,
"stream": stream
}
response = requests.post(
self.api_base,
headers=self._build_headers(),
json=data,
stream=stream
)
if response.status_code != 200:
return jsonify({"error": f"API Error: {response.text}"}), 500
if stream:
def generate():
full_content = ""
for line in response.iter_lines():
if not line:
continue
decoded_line = line.decode('utf-8')
if not decoded_line.startswith("data: "):
continue
chunk = json.loads(decoded_line[6:])
if chunk=="[DONE]" :
break
try:
if "content" in chunk["choices"][0]["delta"]:
delta = chunk["choices"][0]["delta"]["content"]
full_content += delta
yield delta
except json.JSONDecodeError:
yield "[数据解析错误]"
self.conversations[conversation_id].append({
"role": "assistant",
"content": full_content
})
return Response(generate(), mimetype='text/event-stream')
else:
content = response.json()["choices"][0]["message"]["content"]
self.conversations[conversation_id].append({
"role": "assistant",
"content": content
})
return jsonify({
"conversation_id": conversation_id,
"response": content
})
def chat_endpoint(self):
data = request.json
try:
# 校验基础参数
if 'message' not in data or not data['message'].strip():
return jsonify({"error": "消息内容不能为空"}), 400
# 处理对话ID
cid = self.get_or_create_conversation(data.get('conversation_id'))
# 构造消息
new_message = {
"role": "user",
"content": data['message'].strip()
}
if not self._validate_message(new_message):
return jsonify({"error": "无效的消息结构"}), 400
# 添加消息
self.conversations[cid].append(new_message)
return self.generate_response(cid, data.get('stream', False))
except KeyError:
return jsonify({"error": "请求格式错误"}), 400
def save_conversation(self):
data = request.json
cid = data['conversation_id']
return jsonify({
"conversation_id": cid,
"messages": self.conversations.get(cid, [])
})
def load_conversation(self):
data = request.json
try:
cid = self.init_conversation(
data.get('conversation_id'),
data['messages']
)
return jsonify({"conversation_id": cid}), 200
except KeyError:
return jsonify({"error": "缺少必要参数 messages"}), 400
except ValueError as e:
return jsonify({"error": str(e)}), 400
def run(self, host='127.0.0.1', port=5000, debug=True):
self.app.run(host=host, port=port, debug=debug)
if __name__ == '__main__':
server = DeepSeekChatServer()
server.run()
评论前必须登录!
注册