目录
- 背景和价值
- 初始化(替换为你的 API Key)
- 参考资料
背景和价值
安装依赖
pip install dashscope langchain
创建自定义 ChatModel 类
from typing import Optional, List, Dict, Any
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.outputs import ChatResult
import dashscopeclass QwenChatModel(BaseChatModel):"""阿里千问的 LangChain ChatModel 封装"""# 必填参数api_key: str # 阿里云 API Key(从控制台获取)model_name: str = "qwen-turbo" # 模型名称,例如 qwen-turbo/qwen-plus# 可选参数temperature: Optional[float] = 0.8 # 温度参数top_p: Optional[float] = 0.9 # 采样参数def _generate(self,messages: List[Dict[str, Any]],**kwargs: Any,) -> ChatResult:# 调用阿里云 DashScope APIresponse = dashscope.Generation.call(model=self.model_name,messages=messages,temperature=kwargs.get("temperature", self.temperature),top_p=kwargs.get("top_p", self.top_p),)# 解析返回结果if response.status_code == 200:content = response.output.choices[0].message["content"]return ChatResult(generations=[{"text": content}])else:raise Exception(f"API 调用失败: {response}")def _convert_messages_to_qwen_format(self, messages: List[Union[SystemMessage, HumanMessage, AIMessage]]) -> List[Dict[str, str]]:"""将 LangChain 消息转换为千问 API 格式"""qwen_messages = []for msg in messages:if isinstance(msg, SystemMessage):qwen_messages.append({"role": "system", "content": msg.content})elif isinstance(msg, HumanMessage):qwen_messages.append({"role": "user", "content": msg.content})elif isinstance(msg, AIMessage):qwen_messages.append({"role": "assistant", "content": msg.content})return qwen_messagesdef _call(self,messages: List[Union[SystemMessage, HumanMessage, AIMessage]],**kwargs: Any,) -> str:qwen_messages = self._convert_messages_to_qwen_format(messages)result = self._generate(qwen_messages, **kwargs)return result.generations[0]["text"]@propertydef _llm_type(self) -> str:return "qwen-chat"
步骤 3:初始化并调用模型
初始化(替换为你的 API Key)
qwen = QwenChatModel(api_key="sk-your-api-key-here")# 构建对话历史
messages = [SystemMessage(content="你是一个诗人,用中文回答所有问题"),HumanMessage(content="写一首关于秋天的诗")
]# 调用模型
response = qwen.invoke(messages)
print(response)