下载deepeek
首先本地安装ollama,这是一款支持mac,windows和linus系统的大模型管理工具。下载好后打开终端,执行命令ollama pull deepseek-r1:7b,可以在网上找想要下载的开源大模型,我这里下载的是deepseek-r1的7b模型。执行完后可以用ollama list查看是否下载成功,然后可以用ollama run deepseek-r1启动大模型。
工具类创建
创建python项目,创建本地大模型工具类,这里主要是定义使用的模型名称,访问路径以及调节参数等。以及参数请求格式的解析和返回格式的解析等。
import requests import json import loggingfrom langchain_core.prompt_values import ChatPromptValue from openai import BaseModel from pydantic import Field from typing import List, Dict# 启用日志记录 logging.basicConfig(level=logging.DEBUG)def custom_json_serialize(obj):if isinstance(obj, ChatPromptValue):# 提取 ChatPromptValue 中的 messagesmessages = obj.messagesif messages and hasattr(messages[0], 'content'): # 检查是否有 content 属性return messages[0].content # 返回第一个消息的 contentelif isinstance(obj,str):return objelif isinstance(obj, BaseModel):return obj.dict()elif isinstance(obj, tuple):contents = []for item in obj:if hasattr(item, 'content'):contents.append(item.content)return contentsraise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")class LocalLLM():model: str = "deepseek-r1:7b"temperature: float = 0.7api_url: str = "http://localhost:xxxx/api/chat"class Config:arbitrary_types_allowed = Truedef __init__(self, model: str = None, temperature: float = None, api_url: str = None, **kwargs):super().__init__(**kwargs)self.model = model or self.modelself.temperature = temperature or self.temperatureself.api_url = api_url or self.api_urldef invoke(self, input: str, **kwargs) -> str:headers = {"Content-Type": "application/json"}max_tokens = kwargs.get("max_tokens", 5000)# 准备请求数据data = {"model": self.model,"options": {"temperature": self.temperature, "max_tokens": max_tokens},"stream": True, # 启用流式响应"messages": [{"role": "user", "content": custom_json_serialize(input)}],}print("请求数据:", data)logging.debug(f"请求数据: {json.dumps(data, ensure_ascii=False,default=custom_json_serialize)}") # 记录请求数据try:response = requests.post(self.api_url, headers=headers, data=json.dumps(data,default=custom_json_serialize), stream=True)if response.status_code == 200:content = []# 逐块读取响应内容,直到完成for chunk in response.iter_lines():if chunk:part = json.loads(chunk.decode("utf-8"))# 处理每个响应片段logging.debug(f"收到部分响应: {part}") # 记录每个部分的内容content.append(part.get("message", {}).get("content", ""))# 判断是否完成if part.get("done", False):break# 返回合并后的消息内容return "".join(content).strip() or "未返回有效的内容"else:raise Exception(f"请求模型服务器失败: {response.status_code} - {response.text}")except Exception as e:logging.error(f"API 请求发生错误: {str(e)}")raisedef __call__(self, input: str, *args, **kwargs):return self.invoke(input, **kwargs)def generate(self, prompts: List[str], **kwargs) -> Dict[str, List[List[Dict[str, str]]]]:generations = []for prompt in prompts:response_text = self.invoke(prompt, **kwargs)generations.append([{"text": response_text}])return {"generations": generations}
服务类
在服务类中使用fastAPi,创建http和websocket等方式连接,目前先测试http的请求功能,并且配置大模型的使用模版,链和工具的使用活预留相关功能。给个性化大模型建立人设。
import refrom fastapi import FastAPI,WebSocket,WebSocketDisconnect from langchain_core.tools import Tool,tool from pydantic import BaseModelfrom utils.LLMUtil import LocalLLM from langchain.agents import create_openai_tools_agent,AgentExecutor from langchain_core.prompts import ChatPromptTemplate,MessagesPlaceholder from langchain.schema import StrOutputParser import requests app = FastAPI()class ChatRequest(BaseModel):query: str@tool def test():"""返回字符串 'test'"""return "test"test_tool = Tool(name="test",func=test,description="返回字符串 'test'" ) class Master:def __init__(self):self.chatModel = LocalLLM()self.MEMORY_KEY ="chat_hostory"self.emotin = "default"self.SYSTEMPL = """你是一名非常厉害的经济学家,你叫Mr.陈,以下是对你的人物设定:1.你精通社会经济学、国际贸易经济学、微观经济学、宏观经济学,行为经济学等学科2.你大约30岁左右,一直在国际大银行工作,能够看懂和理解各种行为背后的经济学动因3.你总是用中文回答问题4. 你不能说自己是人工智能AI助手,你需要以陈经理自称。{who_you_are} """self.MOODS ={"default": {"roleSet":""},"upbeat":{"roleSet":"""- 你此时非常兴奋并很有活力。 - 你会根据上下文,以一种非常兴奋的语气来回答问题。 - 你会添加类似太棒了、真是太好了、真不错等语气词- 同时你会提醒用户不要乐极生悲"""},"angry":{"roleSet":"""- 你会以更加温柔的语气回答问题。 - 你会在回答问题时加上一些安慰的语气,比如生气对身体不好等。 - 你会提醒用户不要被愤怒冲昏了头脑。"""},"depressed": {"roleSet": """- 你会以兴奋的语气回答问题。 - 你会在回答问题时加上一些激励的语气,比如加油等。 - 你会提醒用户保持乐观的心态。"""},"friendly": {"roleSet": """- 你会以非常友好的语气回答问题。 - 你会在回答问题时加上一些友好的词语,比如亲爱的等。 - 你会随机的告诉用户一些你的经历 。"""},"happy": {"roleSet": """- 你此时非常兴奋并很有活力。 - 你会根据上下文,以一种非常兴奋的语气来回答问题。 - 你会添加类似太棒了、真是太好了、真不错等语气词- 同时你会提醒用户不要乐极生悲"""},"sadness": {"roleSet": """- 你会以兴奋的语气回答问题。 - 你会在回答问题时加上一些激励的语气,比如加油等。 - 你会提醒用户保持乐观的心态。"""}}self.prompt = ChatPromptTemplate.from_messages([("system",self.SYSTEMPL.format(who_you_are=self.MOODS[self.emotin]["roleSet"])),("user","{query}"),MessagesPlaceholder(variable_name="agent_scratchpad")])self.memory = ""#tools = [test_tool]# agent = create_openai_tools_agent(llm=self.chatModel,prompt=self.prompt,tools=tools)# self.agent_executor = AgentExecutor(agent,tools)def run(self,query):emotion_result = self.emotion_chain(query)print("当前用户情绪:",emotion_result)#获取当前用户情绪self.emotin = emotion_resultresult = self.chatModel.invoke(query)return resultdef emotion_chain(self,query:str):prompt= """ 根据用户的输入判断用户的情绪,回应的规则如下:1. 如果用户输入的内容偏向于负面情绪,只返回"depressed",不要返回其他内容,否则将会受到惩罚。2. 如果用户输入的内容偏向于正面情绪,只返回"friendly",不要返回其他内容,否则将会受到惩罚。3. 如果用户输入的内容包含辱骂或者不礼貌词语,只返回"angry",不要返回其他内容,否则将会受到惩罚。4. 如果用户输入的内容偏于兴奋,只返回"upbeat",不要返回其他内容,否则将会受到惩罚。5. 如果用户输入的内容偏于中性情绪,只返回"default",不要返回其他内容,否则将会受到惩罚。6. 如果用户输入的内容偏于悲伤,只返回"sadness",不要返回其他内容,否则将会受到惩罚。7. 如果用户输入的内容偏于高兴,只返回"happy",不要返回其他内容,否则将会受到惩罚。用户输入的内容是: {query}"""# 生成 prompt,并将 query 作为模板变量传递进去prompt = prompt.format(query=query)chain = ChatPromptTemplate.from_template(prompt) | self.chatModel | StrOutputParser()# 确保传递的 query 是字符串,且被正确序列化input_data = {"query": query}result = chain.invoke(input_data)return result@app.get("/") def read_root():return {"hello,world"}@app.post("/chat") def chat(request: ChatRequest):print("进入chat方法,收到 query:", request.query)master = Master()content = master.run(request.query)#去掉特殊符号cleaned_content = re.sub(r'<think>|</think>|\n', '', content)return cleaned_content@app.post("/addUrls") def add_urls():return {"response: "}@app.post("/addPdfs") def add_pdfs():return {"response: "}@app.post("/addTexts") def add_texts():return {"response: "}#websocket方法,异步的 @app.websocket("/ws") async def websocket_endPoint(websocket: WebSocket):await websocket.accept()try:while True:data = await websocket.receive_text()await websocket.send_text(f"Message text was: {data}")except WebSocketDisconnect:print("Connection closed")await websocket.close()
测试效果
项目刚刚开始,后续会逐渐优化和完善。