云计算百科
云计算领域专业知识百科平台

山东大学项目实训(2) AI响应服务器的搭建

山东大学项目实训(2) AI响应服务器的搭建

往期文章

山东大学项目实训(1) DeepSeek API调用

文章目录

  • 山东大学项目实训(2) AI响应服务器的搭建
    • 往期文章
    • 1. AI服务器的作用
    • 2. 我们的项目架构
    • 3. AI服务器编写
      • AI服务器问答交互
      • 多轮对话处理
      • 代码编写

1. AI服务器的作用

AI服务器就是下与接受deepseek模型交互, 获取AI回答后返回到应用中的中间服务器. 其功能如下

服务器功能

其实从上图看出AI服务器是一个很"鸡肋"的功能, 貌似后端同学可以直接调用deepseek模型, 但是这样的情形不利于我们分工开发和我进行模型的研究, 故我暂时实现了一个AI服务器, 其在后期可能融入后端当中, 详见下部分架构的介绍.

2. 我们的项目架构

我提出了1, 2两种架构, 加上我们组员提出的一种架构共三种

项目架构

这三种架构各有优劣, 我列举如下

  • 方案1
    • 几个优点: 开发各自独立, 很适合我们这样的小组作业, 且可扩展性高, 代码耦合度低. 适合跨语言作业, 像我们组后端使用java且AI服务使用python的情况下也可简单地使用http等通用协议调用.
    • 缺点: 后期问题追踪不易, 且显而易见的效率低, 但实则不然, 其效率在以下几种情形有优势:
    • 后端和服务器在一台服务器上运行, 此时二者间通信耗时很小, 当然速度还是不如方案2, 但初步考虑我们的用户需求应该够用.
    • 后端性能积压, 比如要处理搜题等复杂任务, 这时将AI服务器分离到别的服务器上可以减缓后端性能压力.
    • AI模型本地部署, 当我们将AI部署到本地时, AI服务器可直接承担AI模型的运作. 相当于架构方案图中的本地部署部分.
  • 方案2
    • 优点:效率较高.
    • 缺点:代码耦合度高, 后续扩展困难. 在本地部署时需要后端自行处理AI的输出.
  • 方案3
    • 优点:集成了方案1的架构优势和方案2的高效率
    • 缺点:前端工作量大, 且需要在数据统一处理和AI服务器不存数据间做出选择.

由于我们现在的需求较简单, 只在多用户对话的上下文处理间有些许疑问, 故三种架构均可, 我们暂定方案1. 因此下文中我都假设是后端调用AI服务器.

大模型的所有调用参数基本都可以在对话内容中给出, 其强大的语言处理能力使得用户个性化信息的传递非常容易. 例如:用户在数据结构领域答题正确率是70%, 我们只需要把这个信息告诉deepseek即可, 无需其它处理. 因此这使得AI服务器与后端的交互很简单. 换句话说, 这印证了AI服务器很"鸡肋" .

3. AI服务器编写

AI服务器问答交互

我们的大语言模型需求先定为智能问答, 智能答题和智能出题. 由于大语言模型的API参数较少, 因此我们只需设置不同的temperature和引导词即可. 引导词例如: {"role":"system", "content":"你在为用户出题, 请按照'<题目>题目内容</题目><答案>答案内容</答案>'的格式出题"} 引导词的设置可由后端或服务器端完成. 返回即为一条json对话或者流数据或者纯粹是AI回答的文本, 内容为AI的回答.

多轮对话处理

我们目前设定一个用户可以有若干个对话, 每个对话都有其唯一ID, 因此AI服务器只关注对话ID而不关注用户, 对于用户信息后端许通过参数传递或者直接放在与AI的对话中, 例如"用户在数据结构领域历史正确率为70%, 请据此出一道题目, 使其符合并能提高用户水平" . 这种个性化工作可由后端或者AI服务器完成, 我们组在后期合并阶段可由双方同学合作完成.

用户对话信息存储是个大问题. 我们暂时决定将对话记录存到用户本地, 还有一种可选方案是存到后端服务器上的数据库中.总而言之, 其大概率不会存在AI服务器本地. 因此, 我们需要在对话开始时将内容加载进AI服务器中. 考虑到服务器关断后重启等情况, 我选择处理以下两种情况:

  • 后端调用初始化函数, 在AI服务器缓存中注册对话ID, 并将后端给的历史记录存入服务器缓存中.
  • 后端进行对话请求, 这时缓存中若没有此对话的ID记录, 则新建一个 ,若有, 则读取历史对话记录.
  • 代码编写

    类的初始化, 其中save函数负责将内存中历史记录传回后端, 考虑到前/后端自主维护用户对话记录的设计, 一般用不到.

    class DeepSeekChatServer:
    def __init__(self):
    #flask框架
    self.app = Flask(__name__)
    CORS(self.app)
    #对话内容内存,重要
    self.conversations = {} # {conversation_id: [messages]}
    #api调用相关
    #一般人用deepseek官网,这是我们学校的api
    #记得把/v1/chat/completions部分加上
    #openai的库会自动给你添加而你自己写的request不会
    self.api_base = "http://学校给的api/v1/chat/completions"
    self.api_key = "sk-差点忘了删" #填写你自己的key
    #model类型也按官网来,这个是自己api上的类型
    self.AI_model="DeepSeek-R1"
    # 注册路由
    self.app.add_url_rule('/chat', 'chat_endpoint', self.chat_endpoint, methods=['POST'])
    self.app.add_url_rule('/save', 'save_conversation', self.save_conversation, methods=['POST'])
    self.app.add_url_rule('/load', 'load_conversation', self.load_conversation, methods=['POST'])

    读取历史记录与初始化对话记录

    def get_or_create_conversation(self, conversation_id=None):
    cid = conversation_id or str(uuid.uuid4())
    if cid not in self.conversations:
    self.conversations[cid] = []
    return cid
    def init_conversation(
    self,
    conversation_id: Optional[str] = None,
    init_messages: Optional[List[Dict]] = None
    ) > str:
    """初始化或重置对话,严格校验消息格式"""
    cid = conversation_id or str(uuid.uuid4())

    # 校验初始化消息
    if init_messages is not None:
    if not isinstance(init_messages, list):
    raise ValueError("初始化消息必须是消息列表")
    if not all(self._validate_message(msg) for msg in init_messages):
    raise ValueError("消息格式不合法")

    self.conversations[cid] = init_messages.copy() if init_messages else []
    return cid

    传输信息合法性检验,如果是信任的后端传来的,建议不使用以提高效率.

    def _validate_message(self, message: Dict) > bool:
    required_keys = {"role", "content"}
    return (
    isinstance(message, dict)
    and all(key in message for key in required_keys)
    and message["role"] in ("user", "assistant", "system")
    and isinstance(message["content"], str)
    and len(message["content"].strip()) > 0
    )

    主要的请求处理部分, 支持流式和非流式输出, 让deepseek给我加了很多错误处理部分. 注意data, header等部分其实可以写到类的属性里以提高效率, 不用每次都在代码里生成, 但是我们的项目还没固定, 先这么写着.

    def _build_headers(self):
    return {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {self.api_key}",
    "Accept": "application/json"
    }
    def generate_response(self, conversation_id, stream=False):
    messages = self.conversations.get(conversation_id, [])

    data = {
    "model": self.AI_model,
    "messages": messages,
    "temperature": 0.7,
    "stream": stream
    }

    response = requests.post(
    self.api_base,
    headers=self._build_headers(),
    json=data,
    stream=stream
    )

    if response.status_code != 200:
    return jsonify({"error": f"API Error: {response.text}"}), 500

    if stream:
    def generate():
    full_content = ""
    for line in response.iter_lines():
    if not line:
    continue

    decoded_line = line.decode('utf-8')
    if not decoded_line.startswith("data: "):
    continue

    chunk = json.loads(decoded_line[6:])
    if chunk=="[DONE]" :
    break

    try:
    if "content" in chunk["choices"][0]["delta"]:
    delta = chunk["choices"][0]["delta"]["content"]
    full_content += delta
    yield delta
    except json.JSONDecodeError:
    yield "[数据解析错误]"
    self.conversations[conversation_id].append({
    "role": "assistant",
    "content": full_content
    })
    return Response(generate(), mimetype='text/event-stream')
    else:
    content = response.json()["choices"][0]["message"]["content"]
    self.conversations[conversation_id].append({
    "role": "assistant",
    "content": content
    })
    return jsonify({
    "conversation_id": conversation_id,
    "response": content
    })

    向外暴露的接口与运行, 也让deepseek添加了很多错误处理.

    def chat_endpoint(self):
    data = request.json
    try:
    # 校验基础参数
    if 'message' not in data or not data['message'].strip():
    return jsonify({"error": "消息内容不能为空"}), 400
    # 处理对话ID
    cid = self.get_or_create_conversation(data.get('conversation_id'))
    # 构造消息
    new_message = {
    "role": "user",
    "content": data['message'].strip()
    }
    if not self._validate_message(new_message):
    return jsonify({"error": "无效的消息结构"}), 400
    # 添加消息
    self.conversations[cid].append(new_message)

    return self.generate_response(cid, data.get('stream', False))

    except KeyError:
    return jsonify({"error": "请求格式错误"}), 400

    def save_conversation(self):
    data = request.json
    cid = data['conversation_id']
    return jsonify({
    "conversation_id": cid,
    "messages": self.conversations.get(cid, [])
    })

    def load_conversation(self):
    data = request.json
    try:
    cid = self.init_conversation(
    data.get('conversation_id'),
    data['messages']
    )
    return jsonify({"conversation_id": cid}), 200
    except KeyError:
    return jsonify({"error": "缺少必要参数 messages"}), 400
    except ValueError as e:
    return jsonify({"error": str(e)}), 400

    def run(self, host='127.0.0.1', port=5000, debug=True):
    self.app.run(host=host, port=port, debug=debug)

    运行代码

    if __name__ == '__main__':
    server = DeepSeekChatServer()
    server.run()

    完整代码

    import os
    from typing import Dict, List, Optional
    import uuid
    import json
    from flask import Flask, request, jsonify, Response
    from flask_cors import CORS
    import requests

    class DeepSeekChatServer:
    def __init__(self):
    #flask框架
    self.app = Flask(__name__)
    CORS(self.app)
    #对话内容内存,重要
    self.conversations = {} # {conversation_id: [messages]}
    #api调用相关
    #一般人用deepseek官网,这是我们学校的api
    #记得把/v1/chat/completions部分加上
    #openai的库会自动给你添加而你自己写的request不会
    self.api_base = "http://某个api:某个端口/v1/chat/completions"
    self.api_key = "sk-又差点忘了删"
    self.AI_model="DeepSeek-R1"
    # 注册路由
    self.app.add_url_rule('/chat', 'chat_endpoint', self.chat_endpoint, methods=['POST'])
    self.app.add_url_rule('/save', 'save_conversation', self.save_conversation, methods=['POST'])
    self.app.add_url_rule('/load', 'load_conversation', self.load_conversation, methods=['POST'])

    def get_or_create_conversation(self, conversation_id=None):
    cid = conversation_id or str(uuid.uuid4())
    if cid not in self.conversations:
    self.conversations[cid] = []
    return cid
    def init_conversation(
    self,
    conversation_id: Optional[str] = None,
    init_messages: Optional[List[Dict]] = None
    ) > str:
    """初始化或重置对话,严格校验消息格式"""
    cid = conversation_id or str(uuid.uuid4())

    # 校验初始化消息
    if init_messages is not None:
    if not isinstance(init_messages, list):
    raise ValueError("初始化消息必须是消息列表")
    if not all(self._validate_message(msg) for msg in init_messages):
    raise ValueError("消息格式不合法")

    self.conversations[cid] = init_messages.copy() if init_messages else []
    return cid

    def _validate_message(self, message: Dict) > bool:
    required_keys = {"role", "content"}
    return (
    isinstance(message, dict)
    and all(key in message for key in required_keys)
    and message["role"] in ("user", "assistant", "system")
    and isinstance(message["content"], str)
    and len(message["content"].strip()) > 0
    )
    def _build_headers(self):
    return {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {self.api_key}",
    "Accept": "application/json"
    }
    def generate_response(self, conversation_id, stream=False):
    messages = self.conversations.get(conversation_id, [])

    data = {
    "model": self.AI_model,
    "messages": messages,
    "temperature": 0.7,
    "stream": stream
    }

    response = requests.post(
    self.api_base,
    headers=self._build_headers(),
    json=data,
    stream=stream
    )

    if response.status_code != 200:
    return jsonify({"error": f"API Error: {response.text}"}), 500

    if stream:
    def generate():
    full_content = ""
    for line in response.iter_lines():
    if not line:
    continue

    decoded_line = line.decode('utf-8')
    if not decoded_line.startswith("data: "):
    continue

    chunk = json.loads(decoded_line[6:])
    if chunk=="[DONE]" :
    break

    try:
    if "content" in chunk["choices"][0]["delta"]:
    delta = chunk["choices"][0]["delta"]["content"]
    full_content += delta
    yield delta
    except json.JSONDecodeError:
    yield "[数据解析错误]"
    self.conversations[conversation_id].append({
    "role": "assistant",
    "content": full_content
    })
    return Response(generate(), mimetype='text/event-stream')
    else:
    content = response.json()["choices"][0]["message"]["content"]
    self.conversations[conversation_id].append({
    "role": "assistant",
    "content": content
    })
    return jsonify({
    "conversation_id": conversation_id,
    "response": content
    })

    def chat_endpoint(self):
    data = request.json
    try:
    # 校验基础参数
    if 'message' not in data or not data['message'].strip():
    return jsonify({"error": "消息内容不能为空"}), 400
    # 处理对话ID
    cid = self.get_or_create_conversation(data.get('conversation_id'))
    # 构造消息
    new_message = {
    "role": "user",
    "content": data['message'].strip()
    }
    if not self._validate_message(new_message):
    return jsonify({"error": "无效的消息结构"}), 400
    # 添加消息
    self.conversations[cid].append(new_message)

    return self.generate_response(cid, data.get('stream', False))

    except KeyError:
    return jsonify({"error": "请求格式错误"}), 400

    def save_conversation(self):
    data = request.json
    cid = data['conversation_id']
    return jsonify({
    "conversation_id": cid,
    "messages": self.conversations.get(cid, [])
    })

    def load_conversation(self):
    data = request.json
    try:
    cid = self.init_conversation(
    data.get('conversation_id'),
    data['messages']
    )
    return jsonify({"conversation_id": cid}), 200
    except KeyError:
    return jsonify({"error": "缺少必要参数 messages"}), 400
    except ValueError as e:
    return jsonify({"error": str(e)}), 400

    def run(self, host='127.0.0.1', port=5000, debug=True):
    self.app.run(host=host, port=port, debug=debug)

    if __name__ == '__main__':
    server = DeepSeekChatServer()
    server.run()

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 山东大学项目实训(2) AI响应服务器的搭建
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!