前言
过去半年,随着ChatGPT的火爆,直接带火了整个LLM这个方向,然LLM毕竟更多是基于过去的经验数据预训练而来,没法获取最新的知识,以及各企业私有的知识
- 为了获取最新的知识,ChatGPT plus版集成了bing搜索的功能,有的模型则会调用一个定位于 “链接各种AI模型、工具的langchain”的bing功能
- 为了处理企业私有的知识,要么基于开源模型微调,要么也可以通过langchain作为一种外挂的内部知识库 (类似存在本地的数据库一样)
所以越来越多的人开始关注langchain并把它与LLM结合起来应用,更直接推动了数据库、知识图谱与LLM的结合应用
本文侧重讲解
- LLM与langchain/数据库/知识图谱的结合应用,且解读langchain-ChatGLM项目的关键源码,不只是把它当做一个工具使用,因为对工具的原理更了解,则对工具的使用更顺畅
- 其中,解读langchain-ChatGLM项目源码其实不易,因为涉及的项目、技术点不少,所以一开始容易绕晕,好在根据该项目的流程一步步抽丝剥茧之后,给大家呈现了清晰的代码架构
过程中,我从接触该langchain-ChatGLM项目到整体源码梳理清晰并写清楚历时了近一周,而大家有了本文之后,可能不到一天便可以理清了(提升近7倍效率) ,这便是本文的价值和意义之一
阅读过程中若有任何问题,欢迎随时留言,会一一及时回复/解答,共同探讨、共同深挖
第一部分 什么是LangChain:LLM的外挂/功能库
1.1 langchain的整体组成架构
通俗讲,所谓langchain (官网地址、GitHub地址),即把AI中常用的很多功能都封装成库,且有调用各种商用模型API、开源模型的接口,支持以下各种组件
初次接触的朋友一看这么多组件可能直接晕了(封装的东西非常多,感觉它想把LLM所需要用到的功能/工具都封装起来),为方便理解,我们可以先从大的层面把整个langchain库划分为三个大层:基础层、能力层、应用层
1.1.1 基础层:models、LLMs、index
- Models:模型
各种类型的模型和模型集成,比如OpenAI的各个API/GPT-4等等,为各种不同基础模型提供统一接口
比如通过API完成一次问答import os os.environ["OPENAI_API_KEY"] = '你的api key' from langchain.llms import OpenAIllm = OpenAI(model_name="text-davinci-003",max_tokens=1024) llm("怎么评价人工智能")
- LLMS层
这一层主要强调对models层能力的封装以及服务化输出能力,主要有:
各类LLM模型管理平台:强调的模型的种类丰富度以及易用性
一体化服务能力产品:强调开箱即用
差异化能力:比如聚焦于Promp管理(包括提示管理、提示优化和提示序列化)、基于共享资源的模型运行模式等等
比如Google's PaLM Text APIs,再比如 llms/openai.py 文件下model_token_mapping = {"gpt-4": 8192,"gpt-4-0314": 8192,"gpt-4-0613": 8192,"gpt-4-32k": 32768,"gpt-4-32k-0314": 32768,"gpt-4-32k-0613": 32768,"gpt-3.5-turbo": 4096,"gpt-3.5-turbo-0301": 4096,"gpt-3.5-turbo-0613": 4096,"gpt-3.5-turbo-16k": 16385,"gpt-3.5-turbo-16k-0613": 16385,"text-ada-001": 2049,"ada": 2049,"text-babbage-001": 2040,"babbage": 2049,"text-curie-001": 2049,"curie": 2049,"davinci": 2049,"text-davinci-003": 4097,"text-davinci-002": 4097,"code-davinci-002": 8001,"code-davinci-001": 8001,"code-cushman-002": 2048,"code-cushman-001": 2048,}
- Index:索引
对用户私域文本、图片、PDF等各类文档进行存储和检索(相当于结构化文档,以便让外部数据和模型交互),具体实现上有两个方案:
Vector方案:即对文件先切分为Chunks,在按Chunks分别编码存储并检索,可参考此代码文件:https://github.com/hwchase17/langchain/blob/master/langchain/indexes/vectorstore.py
KG方案:这部分利用LLM抽取文件中的三元组,将其存储为KG供后续检索,可参考此代码文件:https://github.com/hwchase17/langchain/blob/master/langchain/indexes/graph.py
Document Loaders,文档加载的标准接口
与各种格式的文档及数据源集成,比如 Email、Markdown、PDF (所以可以做类似ChatPDF这样的应用)、Youtube …
1.1.2 能力层:Chains、Memory、Tools
如果基础层提供了最核心的能力,能力层则给这些能力安装上手、脚、脑,让其具有记忆和触发万物的能力,包括:Chains、Memory、Tool三部分
- Chains:链接
简言之,相当于包括一系列对各种组件的调用,可能是一个 Prompt 模板,一个语言模型,一个输出解析器,一起工作处理用户的输入,生成响应,并处理输出
具体而言,则相当于按照不同的需求抽象并定制化不同的执行逻辑,Chain可以相互嵌套并串行执行,通过这一层,让LLM的能力链接到各行各业
比如elasticsearch_database
比如graph_qa
比如能自动生成代码并执行的llm_math等等
比如面向私域数据的qa_with_sources
比如面向SQL数据源的sql_database
另外,还有比较让人眼前一亮的有:
constitutional_ai:对最终结果进行偏见、合规问题处理的逻辑,保证最终的结果符合价值观
llm_checker:能让LLM自动检测自己的输出是否有没有问题的逻辑 - Memory:记忆
简言之,用来保存和模型交互时的上下文状态,处理长期记忆
具体而言,这层主要有两个核心点:
对Chains的执行过程中的输入、输出进行记忆并结构化存储,为下一步的交互提供上下文,这部分简单存储在Redis即可
根据交互历史构建知识图谱,根据关联信息给出准确结果 - Tools层,工具
其实Chains层可以根据LLM + Prompt执行一些特定的逻辑,但是如果要用Chain实现所有的逻辑不现实,可以通过Tools层也可以实现,Tools层理解为技能比较合理,典型的比如搜索、Wikipedia、天气预报、ChatGPT服务等等
1.1.3 应用层:Agents
- Agents:代理
简言之,有了基础层和能力层,我们可以构建各种各样好玩的,有价值的服务,这里就是Agent
具体而言,Agent 作为代理人去向 LLM 发出请求,然后采取行动,且检查结果直到工作完成,包括LLM无法处理的任务的代理 (例如搜索或计算,类似ChatGPT plus的插件有调用bing和计算器的功能)
比如,Agent 可以使用维基百科查找 Barack Obama 的出生日期,然后使用计算器计算他在 2023 年的年龄
...# pip install wikipedia from langchain.agents import load_tools from langchain.agents import initialize_agent from langchain.agents import AgentTypetools = load_tools(["wikipedia", "llm-math"], llm=llm) agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)agent.run("奥巴马的生日是哪天? 到2023年他多少岁了?")
最终langchain的整体技术架构可以如下图所示 (查看高清大图,此外,这里还有另一个架构图)
1.2 langchain的部分应用示例:联网搜索 + 文档问答
但看理论介绍,你可能没法理解langchain到底有什么用,为方便大家理解,特举几个langchain的应用示例
1.2.1 通过 Google 搜索并返回答案
由于需要借助 Serpapi 来进行实现,而Serpapi 提供了 Google 搜索的API 接口
故先到 Serpapi 官网(https://serpapi.com/)上注册一个用户,并复制他给我们生成 API key,然后设置到环境变量里面去
import os
os.environ["OPENAI_API_KEY"] = '你的api key'
os.environ["SERPAPI_API_KEY"] = '你的api key'
然后,开始编写代码
from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.llms import OpenAI
from langchain.agents import AgentType# 加载 OpenAI 模型
llm = OpenAI(temperature=0,max_tokens=2048) # 加载 serpapi 工具
tools = load_tools(["serpapi"])# 如果搜索完想再计算一下可以这么写
# tools = load_tools(['serpapi', 'llm-math'], llm=llm)# 如果搜索完想再让他再用python的print做点简单的计算,可以这样写
# tools=load_tools(["serpapi","python_repl"])# 工具加载后都需要初始化,verbose 参数为 True,会打印全部的执行详情
agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)# 运行 agent
agent.run("What's the date today? What great events have taken place today in history?")
1.2.2 用不到 50 行代码实现一个文档对话机器人
众所周知,由于ChatGPT训练的数据只更新到 2021 年,因此它不知道互联网最新的知识(除非它调用搜索功能bing),而利用 “LangChain + ChatGPT的API” 则可以用不到 50 行的代码然后实现一个和既存文档的对话机器人
假设所有 2022 年更新的内容都存在于 2022.txt 这个文档中,那么通过如下的代码,就可以让 ChatGPT 来支持回答 2022 年的问题
其中原理也很简单:
- 对用户的输入/prompt向量化
- 文档分词
- 文档分割
- 文本向量化
向量化了才能进行向量之间相似度的计算 - 向量化的文本存到向量数据库里
- 根据用户的输入/prompt去向量数据里寻找答案(答案的判定是基于prompt/输入与文本中相关段落向量的相似性匹配)
- 最后通过LLM返回答案
#!/usr/bin/python
# -*- coding: UTF-8 -*-import os # 导入os模块,用于操作系统相关的操作
import jieba as jb # 导入结巴分词库
from langchain.chains import ConversationalRetrievalChain # 导入用于创建对话检索链的类
from langchain.chat_models import ChatOpenAI # 导入用于创建ChatOpenAI对象的类
from langchain.document_loaders import DirectoryLoader # 导入用于加载文件的类
from langchain.embeddings import OpenAIEmbeddings # 导入用于创建词向量嵌入的类
from langchain.text_splitter import TokenTextSplitter # 导入用于分割文档的类
from langchain.vectorstores import Chroma # 导入用于创建向量数据库的类# 初始化函数,用于处理输入的文档
def init(): files = ['2022.txt'] # 需要处理的文件列表for file in files: # 遍历每个文件with open(f"./data/{file}", 'r', encoding='utf-8') as f: # 以读模式打开文件data = f.read() # 读取文件内容cut_data = " ".join([w for w in list(jb.cut(data))]) # 对读取的文件内容进行分词处理cut_file = f"./data/cut/cut_{file}" # 定义处理后的文件路径和名称with open(cut_file, 'w') as f: # 以写模式打开文件f.write(cut_data) # 将处理后的内容写入文件# 新建一个函数用于加载文档
def load_documents(directory): # 创建DirectoryLoader对象,用于加载指定文件夹内的所有.txt文件loader = DirectoryLoader(directory, glob='**/*.txt') docs = loader.load() # 加载文件return docs # 返回加载的文档# 新建一个函数用于分割文档
def split_documents(docs): # 创建TokenTextSplitter对象,用于分割文档text_splitter = TokenTextSplitter(chunk_size=1000, chunk_overlap=0) docs_texts = text_splitter.split_documents(docs) # 分割加载的文本return docs_texts # 返回分割后的文本# 新建一个函数用于创建词嵌入
def create_embeddings(api_key): # 创建OpenAIEmbeddings对象,用于获取OpenAI的词向量embeddings = OpenAIEmbeddings(openai_api_key=api_key) return embeddings # 返回创建的词嵌入# 新建一个函数用于创建向量数据库
def create_chroma(docs_texts, embeddings, persist_directory): # 使用文档,embeddings和持久化目录创建Chroma对象vectordb = Chroma.from_documents(docs_texts, embeddings, persist_directory=persist_directory) vectordb.persist() # 持久化存储向量数据return vectordb # 返回创建的向量数据库# load函数,调用上面定义的具有各个职责的函数
def load():docs = load_documents('./data/cut') # 调用load_documents函数加载文档docs_texts = split_documents(docs) # 调用split_documents函数分割文档api_key = os.environ.get('OPENAI_API_KEY') # 从环境变量中获取OpenAI的API密钥embeddings = create_embeddings(api_key) # 调用create_embeddings函数创建词嵌入# 调用create_chroma函数创建向量数据库vectordb = create_chroma(docs_texts, embeddings, './data/cut/') # 创建ChatOpenAI对象,用于进行聊天对话openai_ojb = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo") # 从模型和向量检索器创建ConversationalRetrievalChain对象chain = ConversationalRetrievalChain.from_llm(openai_ojb, vectordb.as_retriever()) return chain # 返回该对象# 调用load函数,获取ConversationalRetrievalChain对象
chain = load() # 定义一个函数,根据输入的问题获取答案
def get_ans(question): chat_history = [] # 初始化聊天历史为空列表result = chain({ # 调用chain对象获取聊天结果'chat_history': chat_history, # 传入聊天历史'question': question, # 传入问题})return result['answer'] # 返回获取的答案if __name__ == '__main__': # 如果此脚本作为主程序运行s = input('please input:') # 获取用户输入while s != 'exit': # 如果用户输入的不是'exit'ans = get_ans(s) # 调用get_ans函数获取答案print(ans) # 打印答案s = input('please input:') # 获取用户输入
//待更
第二部分 基于LangChain + ChatGLM-6B的本地知识库问答
2.1 通过LangChain+LLM实现本地知识库问答的核心步骤
GitHub上有一个利用 langchain 思想实现的基于本地知识库的问答应用,目标期望建立一套对中文场景与开源模型支持友好、可离线运行的知识库问答解决方案,这是其GitHub地址
- 该项目受 GanymedeNil 的项目 document.ai,和 AlexZhangji 创建的 ChatGLM-6B Pull Request 启发,建立了全流程可使用开源模型实现的本地知识库问答应用。现已支持使用 ChatGLM-6B、 ClueAI/ChatYuan-large-v2 等大语言模型的接入
- 该项目中 Embedding 默认选用的是 GanymedeNil/text2vec-large-chinese,LLM 默认选用的是 ChatGLM-6B,依托上述模型,本项目可实现全部使用开源模型离线私有部署
本项目实现原理如下图所示 (与基于文档的问答 大同小异,过程包括:1 加载文档 -> 2 读取文档 -> 3/4文档分割 -> 5/6 文本向量化 -> 8/9 问句向量化 -> 10 在文档向量中匹配出与问句向量最相似的top k个 -> 11/12/13 匹配出的文本作为上下文和问题一起添加到prompt中 -> 14/15提交给LLM生成回答 )
-
第一阶段:加载文件-读取文件-文本分割(Text splitter)
加载文件:这是读取存储在本地的知识库文件的步骤
读取文件:读取加载的文件内容,通常是将其转化为文本格式
文本分割(Text splitter):按照一定的规则(例如段落、句子、词语等)将文本分割,以下只是示例代码(非langchain-ChatGLM项目的源码)def _load_file(self, filename):# 判断文件类型if filename.lower().endswith(".pdf"): # 如果文件是 PDF 格式loader = UnstructuredFileLoader(filename) # 使用 UnstructuredFileLoader 加载器来加载 PDF 文件text_splitor = CharacterTextSplitter() # 使用 CharacterTextSplitter 来分割文件中的文本docs = loader.load_and_split(text_splitor) # 加载文件并进行文本分割else: # 如果文件不是 PDF 格式loader = UnstructuredFileLoader(filename, mode="elements") # 使用 UnstructuredFileLoader 加载器以元素模式加载文件text_splitor = CharacterTextSplitter() # 使用 CharacterTextSplitter 来分割文件中的文本docs = loader.load_and_split(text_splitor) # 加载文件并进行文本分割return docs # 返回处理后的文件数据
-
第二阶段:文本向量化(embedding)-存储到向量数据库
文本向量化(embedding):这通常涉及到NLP的特征抽取,可以通过诸如TF-IDF、word2vec、BERT等方法将分割好的文本转化为数值向量# 初始化方法,接受一个可选的模型名称参数,默认值为 Nonedef __init__(self, model_name=None) -> None: if not model_name: # 如果没有提供模型名称# 使用默认的嵌入模型# 创建一个 HuggingFaceEmbeddings 对象,模型名称为类的 model_name 属性self.embeddings = HuggingFaceEmbeddings(model_name=self.model_name)
存储到向量数据库:文本向量化之后存储到数据库vectorstore(FAISS)
def init_vector_store(self):persist_dir = os.path.join(VECTORE_PATH, ".vectordb") # 持久化向量数据库的地址print("向量数据库持久化地址: ", persist_dir) # 打印持久化地址# 如果持久化地址存在if os.path.exists(persist_dir): # 从本地持久化文件中加载print("从本地向量加载数据...")# 使用 Chroma 加载持久化的向量数据vector_store = Chroma(persist_directory=persist_dir, embedding_function=self.embeddings) # 如果持久化地址不存在else: # 加载知识库documents = self.load_knownlege() # 使用 Chroma 从文档中创建向量存储vector_store = Chroma.from_documents(documents=documents, embedding=self.embeddings,persist_directory=persist_dir) vector_store.persist() # 持久化向量存储return vector_store # 返回向量存储
其中load_knownlege的实现为
def load_knownlege(self):docments = [] # 初始化一个空列表来存储文档# 遍历 DATASETS_DIR 目录下的所有文件for root, _, files in os.walk(DATASETS_DIR, topdown=False):for file in files:filename = os.path.join(root, file) # 获取文件的完整路径docs = self._load_file(filename) # 加载文件中的文档# 更新 metadata 数据new_docs = [] # 初始化一个空列表来存储新文档for doc in docs:# 更新文档的 metadata,将 "source" 字段的值替换为不包含 DATASETS_DIR 的相对路径doc.metadata = {"source": doc.metadata["source"].replace(DATASETS_DIR, "")} print("文档2向量初始化中, 请稍等...", doc.metadata) # 打印正在初始化的文档的 metadatanew_docs.append(doc) # 将文档添加到新文档列表docments += new_docs # 将新文档列表添加到总文档列表return docments # 返回所有文档的列表
-
第三阶段:问句向量化
这是将用户的查询或问题转化为向量,应使用与文本向量化相同的方法,以便在相同的空间中进行比较 -
第四阶段:在文本向量中匹配出与问句向量最相似的top k个
这一步是信息检索的核心,通过计算余弦相似度、欧氏距离等方式,找出与问句向量最接近的文本向量def query(self, q):"""在向量数据库中查找与问句向量相似的文本向量"""vector_store = self.init_vector_store()docs = vector_store.similarity_search_with_score(q, k=self.top_k)for doc in docs:dc, s = docyield s, dc
-
第五阶段:匹配出的文本作为上下文和问题一起添加到prompt中
这是利用匹配出的文本来形成与问题相关的上下文,用于输入给语言模型 -
第六阶段:提交给LLM生成回答
最后,将这个问题和上下文一起提交给语言模型(例如GPT系列),让它生成回答
比如知识查询(代码来源)class KnownLedgeBaseQA:# 初始化def __init__(self) -> None:k2v = KnownLedge2Vector() # 创建一个知识到向量的转换器self.vector_store = k2v.init_vector_store() # 初始化向量存储self.llm = VicunaLLM() # 创建一个 VicunaLLM 对象# 获得与查询相似的答案def get_similar_answer(self, query):# 创建一个提示模板prompt = PromptTemplate(template=conv_qa_prompt_template, input_variables=["context", "question"] # 输入变量包括 "context"(上下文) 和 "question"(问题))# 使用向量存储来检索文档retriever = self.vector_store.as_retriever(search_kwargs={"k": VECTOR_SEARCH_TOP_K}) docs = retriever.get_relevant_documents(query=query) # 获取与查询相关的文本context = [d.page_content for d in docs] # 从文本中提取出内容result = prompt.format(context="\n".join(context), question=query) # 格式化模板,并用从文本中提取出的内容和问题填充return result # 返回结果
如你所见,这种通过组合langchain+LLM的方式,特别适合一些垂直领域或大型集团企业搭建通过LLM的智能对话能力搭建企业内部的私有问答系统,也适合个人专门针对一些英文paper进行问答,比如比较火的一个开源项目:ChatPDF,其从文档处理角度来看,实现流程如下(图源):
2.2 langchain + ChatGLM-6B搭建本地知识库问答项目的部署
2.2.1 部署过程一:支持多种使用模式
其中的LLM模型可以根据实际业务的需求选定,本项目中用的ChatGLM-6B,其GitHub地址为:https://github.com/THUDM/ChatGLM-6B
ChatGLM-6B 是⼀个开源的、⽀持中英双语的对话语⾔模型,基于 General LanguageModel (GLM) 架构,具有 62 亿参数。结合模型量化技术,用户可以在消费级的显卡上进行本地部署(INT4 量化级别下最低只需 6GB 显存)
ChatGLM-6B 使用了和 ChatGPT 相似的技术,针对中文问答和对话进行了优化。经过约 1T 标识符的中英双语训练,辅以监督微调、反馈自助、人类反馈强化学习等技术的加持,62 亿参数的 ChatGLM-6B 已经能生成相当符合人类偏好的回答
- 新建一个python3.8.13的环境(模型文件还是可以用的)
conda create -n langchain python==3.8.13
- 拉取项目
git clone https://github.com/imClumsyPanda/langchain-ChatGLM.git
- 进入目录
cd langchain-ChatGLM
- 安装requirements.txt
conda activate langchain pip install -r requirements.txt
- 当前环境支持装langchain的最高版本是0.0.166,无法安装0.0.174,就先装下0.0.166试下
修改配置文件路径:vi configs/model_config.py
- 将chatglm-6b的路径设置成自己的
“chatglm-6b”: { “name”: “chatglm-6b”, “pretrained_model_name”: “/data/sim_chatgpt/chatglm-6b”, “local_model_path”: None, “provides”: “ChatGLM”
- 修改要运行的代码文件:webui.py
vi webui.py
- 将最后launch函数中的share设置为True,inbrowser设置为True
- 执行webui.py文件
可能是网络问题,无法创建一个公用链接。可以进行云服务器和本地端口的映射,参考:https://www.cnblogs.com/monologuesmw/p/14465117.htmlpython webui.py
对应输出:
占用显存情况:大约15个G
2.2.2 部署过程二:支持多种社区上的在线体验
项目地址:https://github.com/thomas-yanxin/LangChain-ChatGLM-Webui
HUggingFace社区在线体验:https://huggingface.co/spaces/thomas-yanxin/LangChain-ChatLLM
另外也支持ModelScope魔搭社区、飞桨AIStudio社区等在线体验
- 下载项目
git clone https://github.com/thomas-yanxin/LangChain-ChatGLM-Webui.git
- 进入目录
cd LangChain-ChatGLM-Webui
- 安装所需的包
pip install -r requirements.txt pip install gradio==3.10
- 修改config.py
init_llm = "ChatGLM-6B"llm_model_dict = {"chatglm": {"ChatGLM-6B": "/data/sim_chatgpt/chatglm-6b",
- 修改app.py文件,将launch函数中的share设置为True,inbrowser设置为True
执行webui.py文件python webui.py
显存占用约13G
第三部分 逐行深入分析:langchain-ChatGLM项目的源码解读
再回顾一遍langchain-ChatGLM这个项目的架构图(图源)
你会发现该项目主要由以下各大模块组成
- chains: 工作链路实现,如 chains/local_doc_qa 实现了基于本地⽂档的问答实现
- configs:配置文件存储
- knowledge_bas/content:用于存储上传的原始⽂件
- loader: 文档加载器的实现类
- models: llm的接⼝类与实现类,针对开源模型提供流式输出⽀持
- textsplitter: 文本切分的实现类
- vectorstores:用于存储向量库⽂件,即本地知识库本体
- ..
接下来,为方便读者一目了然,更快理解
- 我基本给“下面该项目中的每一行代码”都添加上了中文注释
- 且为理解更顺畅,我解读各个代码文件夹的顺序是根据项目流程逐一展开的 (而非上图GitHub上各个代码文件夹的呈现顺序)
如有问题,可以随时留言评论
3.1 agent:custom_agent/bing_search
3.1.1 agent/custom_agent.py
from langchain.agents import Tool # 导入工具模块
from langchain.tools import BaseTool # 导入基础工具类
from langchain import PromptTemplate, LLMChain # 导入提示模板和语言模型链
from agent.custom_search import DeepSearch # 导入自定义搜索模块# 导入基础单动作代理,输出解析器,语言模型单动作代理和代理执行器
from langchain.agents import BaseSingleActionAgent, AgentOutputParser, LLMSingleActionAgent, AgentExecutor
from typing import List, Tuple, Any, Union, Optional, Type # 导入类型注释模块
from langchain.schema import AgentAction, AgentFinish # 导入代理动作和代理完成模式
from langchain.prompts import StringPromptTemplate # 导入字符串提示模板
from langchain.callbacks.manager import CallbackManagerForToolRun # 导入工具运行回调管理器
from langchain.base_language import BaseLanguageModel # 导入基础语言模型
import re # 导入正则表达式模块# 定义一个代理模板字符串
agent_template = """
你现在是一个{role}。这里是一些已知信息:
{related_content}
{background_infomation}
{question_guide}:{input}{answer_format}
"""# 定义一个自定义提示模板类,继承自字符串提示模板
class CustomPromptTemplate(StringPromptTemplate):template: str # 提示模板字符串tools: List[Tool] # 工具列表# 定义一个格式化函数,根据提供的参数生成最终的提示模板def format(self, **kwargs) -> str:intermediate_steps = kwargs.pop("intermediate_steps")# 判断是否有互联网查询信息if len(intermediate_steps) == 0:# 如果没有,则给出默认的背景信息,角色,问题指导和回答格式background_infomation = "\n"role = "傻瓜机器人"question_guide = "我现在有一个问题"answer_format = "如果你知道答案,请直接给出你的回答!如果你不知道答案,请你只回答\"DeepSearch('搜索词')\",并将'搜索词'替换为你认为需要搜索的关键词,除此之外不要回答其他任何内容。\n\n下面请回答我上面提出的问题!"else:# 否则,根据 intermediate_steps 中的 AgentAction 拼装 background_infomationbackground_infomation = "\n\n你还有这些已知信息作为参考:\n\n"action, observation = intermediate_steps[0]background_infomation += f"{observation}\n"role = "聪明的 AI 助手"question_guide = "请根据这些已知信息回答我的问题"answer_format = ""kwargs["background_infomation"] = background_infomationkwargs["role"] = rolekwargs["question_guide"] = question_guidekwargs["answer_format"] = answer_formatreturn self.template.format(**kwargs) # 格式化模板并返回# 定义一个自定义搜索工具类,继承自基础工具类
class CustomSearchTool(BaseTool):name: str = "DeepSearch" # 工具名称description: str = "" # 工具描述# 定义一个运行函数,接受一个查询字符串和一个可选的回调管理器作为参数,返回DeepSearch的搜索结果def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None):return DeepSearch.search(query = query)# 定义一个异步运行函数,但由于DeepSearch不支持异步,所以直接抛出一个未实现错误async def _arun(self, query: str):raise NotImplementedError("DeepSearch does not support async")# 定义一个自定义代理类,继承自基础单动作代理
class CustomAgent(BaseSingleActionAgent):# 定义一个输入键的属性@propertydef input_keys(self):return ["input"]# 定义一个计划函数,接受一组中间步骤和其他参数,返回一个代理动作或者代理完成def plan(self, intermedate_steps: List[Tuple[AgentAction, str]],**kwargs: Any) -> Union[AgentAction, AgentFinish]:return AgentAction(tool="DeepSearch", tool_input=kwargs["input"], log="")# 定义一个自定义输出解析器,继承自代理输出解析器
class CustomOutputParser(AgentOutputParser):# 定义一个解析函数,接受一个语言模型的输出字符串,返回一个代理动作或者代理完成def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:# 使用正则表达式匹配输出字符串,group1是调用函数名字,group2是传入参数match = re.match(r'^[\s\w]*(DeepSearch)\(([^\)]+)\)', llm_output, re.DOTALL)print(match)# 如果语言模型没有返回 DeepSearch() 则认为直接结束指令if not match:return AgentFinish(return_values={"output": llm_output.strip()},log=llm_output,)# 否则的话都认为需要调用 Toolelse:action = match.group(1).strip()action_input = match.group(2).strip()return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)# 定义一个深度代理类
class DeepAgent:tool_name: str = "DeepSearch" # 工具名称agent_executor: any # 代理执行器tools: List[Tool] # 工具列表llm_chain: any # 语言模型链# 定义一个查询函数,接受一个相关内容字符串和一个查询字符串,返回执行器的运行结果def query(self, related_content: str = "", query: str = ""):tool_name =这段代码的主要目的是建立一个深度搜索的AI代理。AI代理首先通过接收一个问题输入,然后根据输入生成一个提示模板,然后通过该模板引导AI生成回答或进行更深入的搜索。现在,我将继续为剩余的代码添加中文注释```pythonself.tool_nameresult = self.agent_executor.run(related_content=related_content, input=query ,tool_name=self.tool_name)return result # 返回执行器的运行结果# 在初始化函数中,首先从DeepSearch工具创建一个工具实例,并添加到工具列表中def __init__(self, llm: BaseLanguageModel, **kwargs):tools = [Tool.from_function(func=DeepSearch.search,name="DeepSearch",description="")]self.tools = tools # 保存工具列表tool_names = [tool.name for tool in tools] # 提取工具列表中的工具名称output_parser = CustomOutputParser() # 创建一个自定义输出解析器实例# 创建一个自定义提示模板实例prompt = CustomPromptTemplate(template=agent_template,tools=tools,input_variables=["related_content","tool_name", "input", "intermediate_steps"])# 创建一个语言模型链实例llm_chain = LLMChain(llm=llm, prompt=prompt)self.llm_chain = llm_chain # 保存语言模型链实例# 创建一个语言模型单动作代理实例agent = LLMSingleActionAgent(llm_chain=llm_chain,output_parser=output_parser,stop=["\nObservation:"],allowed_tools=tool_names)# 创建一个代理执行器实例agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True)self.agent_executor = agent_executor # 保存代理执行器实例
3.1.2 agent/bing_search.py
#coding=utf8
# 声明文件编码格式为 utf8from langchain.utilities import BingSearchAPIWrapper
# 导入 BingSearchAPIWrapper 类,这个类用于与 Bing 搜索 API 进行交互from configs.model_config import BING_SEARCH_URL, BING_SUBSCRIPTION_KEY
# 导入配置文件中的 Bing 搜索 URL 和 Bing 订阅密钥def bing_search(text, result_len=3):# 定义一个名为 bing_search 的函数,该函数接收一个文本和结果长度的参数,默认结果长度为3if not (BING_SEARCH_URL and BING_SUBSCRIPTION_KEY):# 如果 Bing 搜索 URL 或 Bing 订阅密钥未设置,则返回一个错误信息的文档return [{"snippet": "please set BING_SUBSCRIPTION_KEY and BING_SEARCH_URL in os ENV","title": "env inof not fould","link": "https://python.langchain.com/en/latest/modules/agents/tools/examples/bing_search.html"}]search = BingSearchAPIWrapper(bing_subscription_key=BING_SUBSCRIPTION_KEY,bing_search_url=BING_SEARCH_URL)# 创建 BingSearchAPIWrapper 类的实例,该实例用于与 Bing 搜索 API 进行交互return search.results(text, result_len)# 返回搜索结果,结果的数量由 result_len 参数决定if __name__ == "__main__":# 如果这个文件被直接运行,而不是被导入作为模块,那么就执行以下代码r = bing_search('python')# 使用 Bing 搜索 API 来搜索 "python" 这个词,并将结果保存在变量 r 中print(r)# 打印出搜索结果
3.2 models:包含models和文档加载器loader
- models: llm的接⼝类与实现类,针对开源模型提供流式输出⽀持
- loader: 文档加载器的实现类
3.2.1 models/chatglm_llm.py
from abc import ABC # 导入抽象基类
from langchain.llms.base import LLM # 导入语言学习模型基类
from typing import Optional, List # 导入类型标注模块
from models.loader import LoaderCheckPoint # 导入模型加载点
from models.base import (BaseAnswer, # 导入基本回答模型AnswerResult) # 导入回答结果模型class ChatGLM(BaseAnswer, LLM, ABC): # 定义ChatGLM类,继承基础回答、语言学习模型和抽象基类max_token: int = 10000 # 最大的token数temperature: float = 0.01 # 温度参数,用于控制生成文本的随机性top_p = 0.9 # 排序前0.9的token会被保留checkPoint: LoaderCheckPoint = None # 检查点模型# history = [] # 历史记录history_len: int = 10 # 历史记录长度def __init__(self, checkPoint: LoaderCheckPoint = None): # 初始化方法super().__init__() # 调用父类的初始化方法self.checkPoint = checkPoint # 赋值检查点模型@propertydef _llm_type(self) -> str: # 定义只读属性_llm_type,返回语言学习模型的类型return "ChatGLM"@propertydef _check_point(self) -> LoaderCheckPoint: # 定义只读属性_check_point,返回检查点模型return self.checkPoint@propertydef _history_len(self) -> int: # 定义只读属性_history_len,返回历史记录的长度return self.history_lendef set_history_len(self, history_len: int = 10) -> None: # 设置历史记录长度self.history_len = history_lendef _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: # 定义_call方法,实现模型的具体调用print(f"__call:{prompt}") # 打印调用的提示信息response, _ = self.checkPoint.model.chat( # 调用模型的chat方法,获取回答和其他信息self.checkPoint.tokenizer, # 使用的分词器prompt, # 提示信息history=[], # 历史记录max_length=self.max_token, # 最大长度temperature=self.temperature # 温度参数)print(f"response:{response}") # 打印回答信息print(f"+++++++++++++++++++++++++++++++++++") # 打印分隔线return response # 返回回答def generatorAnswer(self, prompt: str,history: List[List[str]] = [],streaming: bool = False): # 定义生成回答的方法,可以处理流式输入if streaming: # 如果是流式输入history += [[]] # 在历史记录中添加新的空列表for inum, (stream_resp, _) in enumerate(self.checkPoint.model.stream_chat( # 对模型的stream_chat方法返回的结果进行枚举self.checkPoint.tokenizer, # 使用的分词器prompt, # 提示信息history=history[-self.history_len:-1] if self.history_len > 1 else [], # 使用的历史记录max_length=self.max_token, # 最大长度temperature=self.temperature # 温度参数)):# self.checkPoint.clear_torch_cache() # 清空缓存history[-1] = [prompt, stream_resp] # 更新最后一个历史记录answer_result = AnswerResult() # 创建回答结果对象answer_result.history = history # 更新回答结果的历史记录answer_result.llm_output = {"answer": stream_resp} # 更新回答结果的输出yield answer_result # 生成回答结果else: # 如果不是流式输入response, _ = self.checkPoint.model.chat( # 调用模型的chat方法,获取回答和其他信息self.checkPoint.tokenizer, # 使用的分词器prompt, # 提示信息history=history[-self.history_len:] if self.history_len > 0 else [], # 使用的历史记录max_length=self.max_token, # 最大长度temperature=self.temperature # 温度参数)self.checkPoint.clear_torch_cache() # 清空缓存history += [[prompt, response]] # 更新历史记录answer_result = AnswerResult() # 创建回答结果对象answer_result.history = history # 更新回答结果的历史记录answer_result.llm_output = {"answer": response} # 更新回答结果的输出yield answer_result # 生成回答结果
3.2.2 models/shared.py
这个文件的作用是远程调用LLM
import sys # 导入sys模块,通常用于与Python解释器进行交互
from typing import Any # 从typing模块导入Any,用于表示任何类型# 从models.loader.args模块导入parser,可能是解析命令行参数用
from models.loader.args import parser
# 从models.loader模块导入LoaderCheckPoint,可能是模型加载点
from models.loader import LoaderCheckPoint # 从configs.model_config模块导入llm_model_dict和LLM_MODEL
from configs.model_config import (llm_model_dict, LLM_MODEL)
# 从models.base模块导入BaseAnswer,即模型的基础类
from models.base import BaseAnswer # 定义一个名为loaderCheckPoint的变量,类型为LoaderCheckPoint,并初始化为None
loaderCheckPoint: LoaderCheckPoint = None def loaderLLM(llm_model: str = None, no_remote_model: bool = False, use_ptuning_v2: bool = False) -> Any:"""初始化 llm_model_ins LLM:param llm_model: 模型名称:param no_remote_model: 是否使用远程模型,如果需要加载本地模型,则添加 `--no-remote-model:param use_ptuning_v2: 是否使用 p-tuning-v2 PrefixEncoder:return:"""pre_model_name = loaderCheckPoint.model_name # 获取loaderCheckPoint的模型名称llm_model_info = llm_model_dict[pre_model_name] # 从模型字典中获取模型信息if no_remote_model: # 如果不使用远程模型loaderCheckPoint.no_remote_model = no_remote_model # 将loaderCheckPoint的no_remote_model设置为Trueif use_ptuning_v2: # 如果使用p-tuning-v2loaderCheckPoint.use_ptuning_v2 = use_ptuning_v2 # 将loaderCheckPoint的use_ptuning_v2设置为Trueif llm_model: # 如果指定了模型名称llm_model_info = llm_model_dict[llm_model] # 从模型字典中获取指定的模型信息if loaderCheckPoint.no_remote_model: # 如果不使用远程模型loaderCheckPoint.model_name = llm_model_info['name'] # 将loaderCheckPoint的模型名称设置为模型信息中的nameelse: # 如果使用远程模型loaderCheckPoint.model_name = llm_model_info['pretrained_model_name'] # 将loaderCheckPoint的模型名称设置为模型信息中的pretrained_model_nameloaderCheckPoint.model_path = llm_model_info["local_model_path"] # 设置模型的本地路径if 'FastChatOpenAILLM' in llm_model_info["provides"]: # 如果模型信息中的provides包含'FastChatOpenAILLM'loaderCheckPoint.unload_model() # 卸载模型else: # 如果不包含loaderCheckPoint.reload_model() # 重新加载模型provides_class = getattr(sys.modules['models'], llm_model_info['provides']) # 获取模型类modelInsLLM = provides_class(checkPoint=loaderCheckPoint) # 创建模型实例if 'FastChatOpenAILLM' in llm_model_info["provides"]: # 如果模型信息中的provides包含'FastChatOpenAILLM'modelInsLLM.set_api_base_url(llm_model_info['api_base_url']) # 设置API基础URLmodelInsLLM.call_model_name(llm_model_info['name']) # 设置模型名称return modelInsLLM # 返回模型实例
3.3 configs:配置文件存储model_config.py
import torch.cuda
import torch.backends
import os
import logging
import uuidLOG_FORMAT = "%(levelname) -5s %(asctime)s" "-1d: %(message)s"
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.basicConfig(format=LOG_FORMAT)# 在以下字典中修改属性值,以指定本地embedding模型存储位置
# 如将 "text2vec": "GanymedeNil/text2vec-large-chinese" 修改为 "text2vec": "User/Downloads/text2vec-large-chinese"
# 此处请写绝对路径
embedding_model_dict = {"ernie-tiny": "nghuyong/ernie-3.0-nano-zh","ernie-base": "nghuyong/ernie-3.0-base-zh","text2vec-base": "shibing624/text2vec-base-chinese","text2vec": "GanymedeNil/text2vec-large-chinese","m3e-small": "moka-ai/m3e-small","m3e-base": "moka-ai/m3e-base",
}# Embedding model name
EMBEDDING_MODEL = "text2vec"# Embedding running device
EMBEDDING_DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"# supported LLM models
# llm_model_dict 处理了loader的一些预设行为,如加载位置,模型名称,模型处理器实例
# 在以下字典中修改属性值,以指定本地 LLM 模型存储位置
# 如将 "chatglm-6b" 的 "local_model_path" 由 None 修改为 "User/Downloads/chatglm-6b"
# 此处请写绝对路径
llm_model_dict = {"chatglm-6b-int4-qe": {"name": "chatglm-6b-int4-qe","pretrained_model_name": "THUDM/chatglm-6b-int4-qe","local_model_path": None,"provides": "ChatGLM"},"chatglm-6b-int4": {"name": "chatglm-6b-int4","pretrained_model_name": "THUDM/chatglm-6b-int4","local_model_path": None,"provides": "ChatGLM"},"chatglm-6b-int8": {"name": "chatglm-6b-int8","pretrained_model_name": "THUDM/chatglm-6b-int8","local_model_path": None,"provides": "ChatGLM"},"chatglm-6b": {"name": "chatglm-6b","pretrained_model_name": "THUDM/chatglm-6b","local_model_path": None,"provides": "ChatGLM"},"chatglm2-6b": {"name": "chatglm2-6b","pretrained_model_name": "THUDM/chatglm2-6b","local_model_path": None,"provides": "ChatGLM"},"chatglm2-6b-int4": {"name": "chatglm2-6b-int4","pretrained_model_name": "THUDM/chatglm2-6b-int4","local_model_path": None,"provides": "ChatGLM"},"chatglm2-6b-int8": {"name": "chatglm2-6b-int8","pretrained_model_name": "THUDM/chatglm2-6b-int8","local_model_path": None,"provides": "ChatGLM"},"chatyuan": {"name": "chatyuan","pretrained_model_name": "ClueAI/ChatYuan-large-v2","local_model_path": None,"provides": None},"moss": {"name": "moss","pretrained_model_name": "fnlp/moss-moon-003-sft","local_model_path": None,"provides": "MOSSLLM"},"vicuna-13b-hf": {"name": "vicuna-13b-hf","pretrained_model_name": "vicuna-13b-hf","local_model_path": None,"provides": "LLamaLLM"},# 通过 fastchat 调用的模型请参考如下格式"fastchat-chatglm-6b": {"name": "chatglm-6b", # "name"修改为fastchat服务中的"model_name""pretrained_model_name": "chatglm-6b","local_model_path": None,"provides": "FastChatOpenAILLM", # 使用fastchat api时,需保证"provides"为"FastChatOpenAILLM""api_base_url": "http://localhost:8000/v1" # "name"修改为fastchat服务中的"api_base_url"},"fastchat-chatglm2-6b": {"name": "chatglm2-6b", # "name"修改为fastchat服务中的"model_name""pretrained_model_name": "chatglm2-6b","local_model_path": None,"provides": "FastChatOpenAILLM", # 使用fastchat api时,需保证"provides"为"FastChatOpenAILLM""api_base_url": "http://localhost:8000/v1" # "name"修改为fastchat服务中的"api_base_url"},# 通过 fastchat 调用的模型请参考如下格式"fastchat-vicuna-13b-hf": {"name": "vicuna-13b-hf", # "name"修改为fastchat服务中的"model_name""pretrained_model_name": "vicuna-13b-hf","local_model_path": None,"provides": "FastChatOpenAILLM", # 使用fastchat api时,需保证"provides"为"FastChatOpenAILLM""api_base_url": "http://localhost:8000/v1" # "name"修改为fastchat服务中的"api_base_url"},
}# LLM 名称
LLM_MODEL = "chatglm-6b"
# 量化加载8bit 模型
LOAD_IN_8BIT = False
# Load the model with bfloat16 precision. Requires NVIDIA Ampere GPU.
BF16 = False
# 本地lora存放的位置
LORA_DIR = "loras/"# LLM lora path,默认为空,如果有请直接指定文件夹路径
LLM_LORA_PATH = ""
USE_LORA = True if LLM_LORA_PATH else False# LLM streaming reponse
STREAMING = True# Use p-tuning-v2 PrefixEncoder
USE_PTUNING_V2 = False# LLM running device
LLM_DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"# 知识库默认存储路径
KB_ROOT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "knowledge_base")# 基于上下文的prompt模版,请务必保留"{question}"和"{context}"
PROMPT_TEMPLATE = """已知信息:
{context} 根据上述已知信息,简洁和专业的来回答用户的问题。如果无法从中得到答案,请说 “根据已知信息无法回答该问题” 或 “没有提供足够的相关信息”,不允许在答案中添加编造成分,答案请使用中文。 问题是:{question}"""# 缓存知识库数量,如果是ChatGLM2,ChatGLM2-int4,ChatGLM2-int8模型若检索效果不好可以调成’10’
CACHED_VS_NUM = 1# 文本分句长度
SENTENCE_SIZE = 100# 匹配后单段上下文长度
CHUNK_SIZE = 250# 传入LLM的历史记录长度
LLM_HISTORY_LEN = 3# 知识库检索时返回的匹配内容条数
VECTOR_SEARCH_TOP_K = 5# 知识检索内容相关度 Score, 数值范围约为0-1100,如果为0,则不生效,经测试设置为小于500时,匹配结果更精准
VECTOR_SEARCH_SCORE_THRESHOLD = 0NLTK_DATA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "nltk_data")FLAG_USER_NAME = uuid.uuid4().hexlogger.info(f"""
loading model config
llm device: {LLM_DEVICE}
embedding device: {EMBEDDING_DEVICE}
dir: {os.path.dirname(os.path.dirname(__file__))}
flagging username: {FLAG_USER_NAME}
""")# 是否开启跨域,默认为False,如果需要开启,请设置为True
# is open cross domain
OPEN_CROSS_DOMAIN = False# Bing 搜索必备变量
# 使用 Bing 搜索需要使用 Bing Subscription Key,需要在azure port中申请试用bing search
# 具体申请方式请见
# https://learn.microsoft.com/en-us/bing/search-apis/bing-web-search/create-bing-search-service-resource
# 使用python创建bing api 搜索实例详见:
# https://learn.microsoft.com/en-us/bing/search-apis/bing-web-search/quickstarts/rest/python
BING_SEARCH_URL = "https://api.bing.microsoft.com/v7.0/search"
# 注意不是bing Webmaster Tools的api key,# 此外,如果是在服务器上,报Failed to establish a new connection: [Errno 110] Connection timed out
# 是因为服务器加了防火墙,需要联系管理员加白名单,如果公司的服务器的话,就别想了GG
BING_SUBSCRIPTION_KEY = ""# 是否开启中文标题加强,以及标题增强的相关配置
# 通过增加标题判断,判断哪些文本为标题,并在metadata中进行标记;
# 然后将文本与往上一级的标题进行拼合,实现文本信息的增强。
ZH_TITLE_ENHANCE = False
3.4 loader:文档加载与text转换
3.4.1 loader/pdf_loader.py
# 导入类型提示模块,用于强化代码的可读性和健壮性
from typing import List# 导入UnstructuredFileLoader,这是一个从非结构化文件中加载文档的类
from langchain.document_loaders.unstructured import UnstructuredFileLoader# 导入PaddleOCR,这是一个开源的OCR工具,用于从图片中识别和读取文字
from paddleocr import PaddleOCR# 导入os模块,用于处理文件和目录
import os# 导入fitz模块,用于处理PDF文件
import fitz# 导入nltk模块,用于处理文本数据
import nltk# 导入模型配置文件中的NLTK_DATA_PATH,这是nltk数据的路径
from configs.model_config import NLTK_DATA_PATH# 设置nltk数据的路径,将模型配置中的路径添加到nltk的数据路径中
nltk.data.path = [NLTK_DATA_PATH] + nltk.data.path# 定义一个类,UnstructuredPaddlePDFLoader,该类继承自UnstructuredFileLoader
class UnstructuredPaddlePDFLoader(UnstructuredFileLoader):# 定义一个内部方法_get_elements,返回一个列表def _get_elements(self) -> List:# 定义一个内部函数pdf_ocr_txt,用于从pdf中进行OCR并输出文本文件def pdf_ocr_txt(filepath, dir_path="tmp_files"):# 将dir_path与filepath的目录部分合并成一个新的路径full_dir_path = os.path.join(os.path.dirname(filepath), dir_path)# 如果full_dir_path对应的目录不存在,则创建这个目录if not os.path.exists(full_dir_path):os.makedirs(full_dir_path)# 创建一个PaddleOCR实例,设置一些参数ocr = PaddleOCR(use_angle_cls=True, lang="ch", use_gpu=False, show_log=False)# 打开pdf文件doc = fitz.open(filepath)# 创建一个txt文件的路径txt_file_path = os.path.join(full_dir_path, f"{os.path.split(filepath)[-1]}.txt")# 创建一个临时的图片文件路径img_name = os.path.join(full_dir_path, 'tmp.png')# 打开txt_file_path对应的文件,并以写模式打开with open(txt_file_path, 'w', encoding='utf-8') as fout:# 遍历pdf的所有页面for i in range(doc.page_count):# 获取当前页面page = doc[i]# 获取当前页面的文本内容,并写入txt文件text = page.get_text("")fout.write(text)fout.write("\n")# 获取当前页面的所有图片img_list = page.get_images()# 遍历所有图片for img in img_list:# 将图片转换为Pixmap对象pix = fitz.Pixmap(doc, img[0])# 如果图片有颜色信息,则将其转换为RGB格式if pix.n - pix.alpha >= 4:pix = fitz.Pixmap(fitz.csRGB, pix)# 保存图片pix.save(img_name)# 对图片进行OCR识别result = ocr.ocr(img_name)# 从OCR结果中提取文本,并写入txt文件ocr_result = [i[1][0] for line in result for i in line]fout.write("\n".join(ocr_result))# 如果图片文件存在,则删除它if os.path.exists(img_name):os.remove(img_name)# 返回txt文件的路径return txt_file_path# 调用上面定义的函数,获取txt文件的路径txt_file_path = pdf_ocr_txt(self.file_path)# 导入partition_text函数,该函数用于将文本文件分块from unstructured.partition.text import partition_text# 对txt文件进行分块,并返回分块结果return partition_text(filename=txt_file_path, **self.unstructured_kwargs)# 运行入口
if __name__ == "__main__":# 导入sys模块,用于操作Python的运行环境import sys# 将当前文件的上一级目录添加到Python的搜索路径中sys.path.append(os.path.dirname(os.path.dirname(__file__)))# 定义一个pdf文件的路径filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), "knowledge_base", "samples", "content", "test.pdf")# 创建一个UnstructuredPaddlePDFLoader的实例loader = UnstructuredPaddlePDFLoader(filepath, mode="elements")# 加载文档docs = loader.load()# 遍历并打印所有文档for doc in docs:print(doc)
// 待更..
3.5 textsplitter:文档切分
3.5.1 textsplitter/ali_text_splitter.py
ali_text_splitter.py 代码如下所示
# 导入CharacterTextSplitter模块,用于文本切分
from langchain.text_splitter import CharacterTextSplitter
import re # 导入正则表达式模块,用于文本匹配和替换
from typing import List # 导入List类型,用于指定返回的数据类型# 定义一个新的类AliTextSplitter,继承自CharacterTextSplitter
class AliTextSplitter(CharacterTextSplitter): # 类的初始化函数,如果参数pdf为True,那么使用pdf文本切分规则,否则使用默认规则def __init__(self, pdf: bool = False, **kwargs): # 调用父类的初始化函数,接收传入的其他参数super().__init__(**kwargs) self.pdf = pdf # 将pdf参数保存为类的成员变量# 定义文本切分方法,输入参数为一个字符串,返回值为字符串列表def split_text(self, text: str) -> List[str]: if self.pdf: # 如果pdf参数为True,那么对文本进行预处理# 替换掉连续的3个及以上的换行符为一个换行符text = re.sub(r"\n{3,}", r"\n", text) # 将所有的空白字符(包括空格、制表符、换页符等)替换为一个空格text = re.sub('\s', " ", text) # 将连续的两个换行符替换为一个空字符text = re.sub("\n\n", "", text) # 导入pipeline模块,用于创建一个处理流程from modelscope.pipelines import pipeline # 创建一个document-segmentation任务的处理流程# 用的模型为damo/nlp_bert_document-segmentation_chinese-base,计算设备为cpup = pipeline(task="document-segmentation",model='damo/nlp_bert_document-segmentation_chinese-base',device="cpu")result = p(documents=text) # 对输入的文本进行处理,返回处理结果sent_list = [i for i in result["text"].split("\n\t") if i] # 将处理结果按照换行符和制表符进行切分,得到句子列表return sent_list # 返回句子列表
其中,有三点值得注意下
- 参数use_document_segmentation指定是否用语义切分文档
此处采取的文档语义分割模型为达摩院开源的:nlp_bert_document-segmentation_chinese-base (这是其论文) - 另,如果使用模型进行文档语义切分,那么需要安装:
modelscope[nlp]:pip install "modelscope[nlp]" -f https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html
- 且考虑到使用了三个模型,可能对于低配置gpu不太友好,因此这里将模型load进cpu计算,有需要的话可以替换device为自己的显卡id
3.6 knowledge_bas:index.faiss/index.pkl
knowledge_bas下面有两个文件,一个content 即用户上传的原始文件,vector_store则用于存储向量库⽂件,即本地知识库本体,因为content因人而异 谁上传啥就是啥 所以没啥好分析,而vector_store下面则有两个文件,一个index.faiss,一个index.pkl
3.7 chains:向量搜索/匹配
本节开头图中“FAISS索引、FAISS搜索”中的“FAISS”是Facebook AI推出的一种用于有效搜索大规模高维向量空间中相似度的库,在大规模数据集中快速找到与给定向量最相似的向量是很多AI应用的重要组成部分,例如在推荐系统、自然语言处理、图像检索等领域
3.7.1 chains/modules /vectorstores.py文件:根据查询向量query在向量数据库中查找与query相似的文本向量
主要是关于FAISS (Facebook AI Similarity Search)的使用,以及一个FAISS向量存储类(FAISSVS,FAISSVS类继承自FAISS类)的定义,包含以下主要方法:
- max_marginal_relevance_search
给定查询语句,首先将查询语句转换为嵌入向量「embedding = self.embedding_function(query)」,然后调用 max_marginal_relevance_search_by_vector 函数进行MMR搜索
max_marginal_relevance_search_by_vector# 使用最大边际相关性返回被选中的文本 def max_marginal_relevance_search(self,query: str, # 查询k: int = 4, # 返回的文档数量,默认为 4fetch_k: int = 20, # 用于传递给 MMR 算法的抓取文档数量**kwargs: Any, ) -> List[Tuple[Document, float]]: # 查询向量化embedding = self.embedding_function(query)# 调用:max_marginal_relevance_search_by_vectordocs = self.max_marginal_relevance_search_by_vector(embedding, k, fetch_k)return docs
通过给定的嵌入向量,使用最大边际相关性(Maximal Marginal Relevance, MMR)方法来返回相关的文本
MMR是一种解决查询结果多样性和相关性的算法,具体来说,它不仅要求返回的文本与查询尽可能相似,而且希望返回的文本集之间尽可能多样# 使用最大边际相关性返回被选中的文档,最大边际相关性旨在优化查询的相似性和选定文本之间的多样性 def max_marginal_relevance_search_by_vector(self, embedding: List[float], k: int = 4, fetch_k: int = 20, **kwargs: Any ) -> List[Tuple[Document, float]]:# 使用索引在文本中搜索与嵌入向量相似的内容,返回最相似的fetch_k个文本的得分和索引scores, indices = self.index.search(np.array([embedding], dtype=np.float32), fetch_k) # 通过索引从文本中重构出嵌入向量,-1表示没有足够的文本返回embeddings = [self.index.reconstruct(int(i)) for i in indices[0] if i != -1] # 使用最大边际相关性算法选择出k个最相关的文本mmr_selected = maximal_marginal_relevance(np.array([embedding], dtype=np.float32), embeddings, k=k) selected_indices = [indices[0][i] for i in mmr_selected] # 获取被选中的文本的索引selected_scores = [scores[0][i] for i in mmr_selected] # 获取被选中的文本的得分docs = []for i, score in zip(selected_indices, selected_scores): # 对于每个被选中的文本索引和得分if i == -1: # 如果索引为-1,表示没有足够的文本返回continue_id = self.index_to_docstore_id[i] # 通过索引获取文本的iddoc = self.docstore.search(_id) # 通过id在文档库中搜索文本if not isinstance(doc, Document): # 如果搜索到的文本不是Document类型,抛出错误raise ValueError(f"Could not find document for id {_id}, got {doc}")docs.append((doc, score)) # 将文本和得分添加到结果列表中return docs # 返回结果列表
- __from
用于从一组文本和对应的嵌入向量创建一个FAISSVS实例。该方法首先创建一个FAISS索引并添加嵌入向量,然后创建一个文本存储以存储与每个嵌入向量关联的文本# 从给定的文本、嵌入向量、元数据等信息构建一个FAISS索引对象 def __from(cls,texts: List[str], # 文本列表,每个文本将被转化为一个文本对象embeddings: List[List[float]], # 对应文本的嵌入向量列表embedding: Embeddings, # 嵌入向量生成器,用于将查询语句转化为嵌入向量metadatas: Optional[List[dict]] = None,**kwargs: Any, ) -> FAISS:faiss = dependable_faiss_import() # 导入FAISS库index = faiss.IndexFlatIP(len(embeddings[0])) # 使用FAISS库创建一个新的索引,索引的维度等于嵌入文本向量的长度index.add(np.array(embeddings, dtype=np.float32)) # 将嵌入向量添加到FAISS索引中# quantizer = faiss.IndexFlatL2(len(embeddings[0]))# index = faiss.IndexIVFFlat(quantizer, len(embeddings[0]), 100)# index.train(np.array(embeddings, dtype=np.float32))# index.add(np.array(embeddings, dtype=np.float32))documents = []for i, text in enumerate(texts): # 对于每一段文本# 获取对应的元数据,如果没有提供元数据则使用空字典metadata = metadatas[i] if metadatas else {} # 创建一个文本对象并添加到文本列表中documents.append(Document(page_content=text, metadata=metadata)) # 为每个文本生成一个唯一的IDindex_to_id = {i: str(uuid.uuid4()) for i in range(len(documents))} # 创建一个文本库,用于存储文本对象和对应的IDdocstore = InMemoryDocstore({index_to_id[i]: doc for i, doc in enumerate(documents)} )# 返回FAISS对象return cls(embedding.embed_query, index, docstore, index_to_id)
以上就是这段代码的主要内容,通过使用FAISS和MMR,它可以帮助我们在大量文本中找到与给定查询最相关的文本
3.7.2 chains /local_doc_qa.py代码文件:向量搜索
- 导入包和模块
代码开始的部分是一系列的导入语句,导入了必要的 Python 包和模块,包括文件加载器,文本分割器,模型配置,以及一些 Python 内建模块和其他第三方库 - 改写 HuggingFaceEmbeddings 类的哈希方法
代码定义了一个名为 _embeddings_hash 的函数,并将其赋值给 HuggingFaceEmbeddings 类的 __hash__ 方法。这样做的目的是使 HuggingFaceEmbeddings 对象可以被哈希,即可以作为字典的键或者被加入到集合中 - 载入向量存储器
定义了一个名为 load_vector_store 的函数,这个函数用于从本地加载一个向量存储器,返回 FAISS 类的对象。其中使用了 lru_cache 装饰器,可以缓存最近使用的 CACHED_VS_NUM 个结果,提高代码效率 - 文件树遍历
tree 函数是一个递归函数,用于遍历指定目录下的所有文件,返回一个包含所有文件的完整路径和文件名的列表。它可以忽略指定的文件或目录 - 加载文件:
load_file 函数根据文件后缀名选择合适的加载器和文本分割器,加载并分割文件 - 生成提醒:
generate_prompt 函数用于根据相关文档和查询生成一个提醒。提醒的模板由 prompt_template 参数提供 - 创建文档列表
search_result2docs# 创建一个空列表,用于存储文档 def search_result2docs(search_results):docs = []# 对于搜索结果中的每一项for result in search_results:# 创建一个文档对象# 如果结果中包含"snippet"关键字,则其值作为页面内容,否则页面内容为空字符串# 如果结果中包含"link"关键字,则其值作为元数据中的源链接,否则源链接为空字符串# 如果结果中包含"title"关键字,则其值作为元数据中的文件名,否则文件名为空字符串doc = Document(page_content=result["snippet"] if "snippet" in result.keys() else "",metadata={"source": result["link"] if "link" in result.keys() else "","filename": result["title"] if "title" in result.keys() else ""})# 将创建的文档对象添加到列表中docs.append(doc)# 返回文档列表return docs
之后,定义了一个名为 LocalDocQA 的类,主要用于基于文档的问答任务。基于文档的问答任务的主要功能是,根据一组给定的文档(这里被称为知识库)以及用户输入的问题,返回一个答案,LocalDocQA 类的主要方法包括:
- init_cfg():此方法初始化一些变量,包括将 llm_model(一个语言模型用于生成答案)分配给 self.llm,将一个基于HuggingFace的嵌入模型分配给 self.embeddings,将输入参数 top_k 分配给 self.top_k
- init_knowledge_vector_store():此方法负责初始化知识向量库。它首先检查输入的文件路径,对于路径中的每个文件,将文件内容加载到 Document 对象中,然后将这些文档转换为嵌入向量,并将它们存储在向量库中
- one_knowledge_add():此方法用于向知识库中添加一个新的知识文档。它将输入的标题和内容创建为一个 Document 对象,然后将其转换为嵌入向量,并添加到向量库中
- get_knowledge_based_answer():此方法是基于给定的知识库和用户输入的问题,来生成一个答案。它首先根据用户输入的问题找到知识库中最相关的文档,然后生成一个包含相关文档和用户问题的提示,将提示传递给 llm_model 来生成答案
且注意一点,这个函数调用了上面已经实现好的:similarity_search_with_score - get_knowledge_based_conent_test():此方法是为了测试的,它将返回与输入查询最相关的文档和查询提示
# query 查询内容
# vs_path 知识库路径
# chunk_conent 是否启用上下文关联
# score_threshold 搜索匹配score阈值
# vector_search_top_k 搜索知识库内容条数,默认搜索5条结果
# chunk_sizes 匹配单段内容的连接上下文长度
def get_knowledge_based_conent_test(self, query, vs_path, chunk_conent,
score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD,
vector_search_top_k=VECTOR_SEARCH_TOP_K, chunk_size=CHUNK_SIZE): - get_search_result_based_answer():此方法与 get_knowledge_based_answer() 类似,不过这里使用的是 bing_search 的结果作为知识库
如你所见,这个函数和上面那个函数的主要区别在于,这个函数是直接利用搜索引擎的搜索结果来生成回答的,而上面那个函数是通过查询相似度搜索来找到最相关的文本,然后基于这些文本生成回答的def get_search_result_based_answer(self, query, chat_history=[], streaming: bool = STREAMING):# 对查询进行 Bing 搜索,并获取搜索结果results = bing_search(query)# 将搜索结果转化为文本的形式result_docs = search_result2docs(results)# 生成用于提问的提示语prompt = generate_prompt(result_docs, query)# 通过 LLM(长语言模型)生成回答for answer_result in self.llm.generatorAnswer(prompt=prompt, history=chat_history,streaming=streaming):# 获取回答的文本resp = answer_result.llm_output["answer"]# 获取聊天历史history = answer_result.history# 将聊天历史中的最后一项的提问替换为当前的查询history[-1][0] = query# 组装回答的结果response = {"query": query,"result": resp,"source_documents": result_docs}# 返回回答的结果和聊天历史yield response, history
而这个bing_search则在3.1.2节中已经定义 - 接下来是分别用于从向量存储中删除文件、更新文件以及列出文件的三个方法
delete_file_from_vector_store
update_file_from_vector_store
list_file_from_vector_store# 删除向量存储中的文件def delete_file_from_vector_store(self,filepath: str or List[str], # 文件路径,可以是单个文件或多个文件列表vs_path): # 向量存储路径vector_store = load_vector_store(vs_path, self.embeddings) # 从给定路径加载向量存储status = vector_store.delete_doc(filepath) # 删除指定文件return status # 返回删除状态# 更新向量存储中的文件def update_file_from_vector_store(self,filepath: str or List[str], # 需要更新的文件路径,可以是单个文件或多个文件列表vs_path, # 向量存储路径docs: List[Document],): # 需要更新的文件内容,文件以文档形式给出vector_store = load_vector_store(vs_path, self.embeddings) # 从给定路径加载向量存储status = vector_store.update_doc(filepath, docs) # 更新指定文件return status # 返回更新状态# 列出向量存储中的文件def list_file_from_vector_store(self,vs_path, # 向量存储路径fullpath=False): # 是否返回完整路径,如果为 False,则只返回文件名vector_store = load_vector_store(vs_path, self.embeddings) # 从给定路径加载向量存储docs = vector_store.list_docs() # 列出所有文件if fullpath: # 如果需要完整路径return docs # 返回完整路径列表else: # 如果只需要文件名return [os.path.split(doc)[-1] for doc in docs] # 用 os.path.split 将路径和文件名分离,只返回文件名列表
__main__
部分的代码是 LocalDocQA 类的实例化和使用示例
- 它首先初始化了一个 llm_model_ins 对象
- 然后创建了一个 LocalDocQA 的实例并调用其 init_cfg() 方法进行初始化
- 之后,它指定了一个查询和知识库的路径
- 然后调用 get_knowledge_based_answer() 或 get_search_result_based_answer() 方法获取基于该查询的答案,并打印出答案和来源文档的信息
3.7.3 chains/text_load.py
chain这个文件夹下 还有最后一个项目文件(langchain-ChatGLM/text_load.py at master · imClumsyPanda/langchain-ChatGLM · GitHub),如下所示
import os
import pinecone
from tqdm import tqdm
from langchain.llms import OpenAI
from langchain.text_splitter import SpacyTextSplitter
from langchain.document_loaders import TextLoader
from langchain.document_loaders import DirectoryLoader
from langchain.indexes import VectorstoreIndexCreator
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Pinecone#一些配置文件
openai_key="你的key" # 注册 openai.com 后获得
pinecone_key="你的key" # 注册 app.pinecone.io 后获得
pinecone_index="你的库" #app.pinecone.io 获得
pinecone_environment="你的Environment" # 登录pinecone后,在indexes页面 查看Environment
pinecone_namespace="你的Namespace" #如果不存在自动创建#科学上网你懂得
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890'
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'#初始化pinecone
pinecone.init(api_key=pinecone_key,environment=pinecone_environment
)
index = pinecone.Index(pinecone_index)#初始化OpenAI的embeddings
embeddings = OpenAIEmbeddings(openai_api_key=openai_key)#初始化text_splitter
text_splitter = SpacyTextSplitter(pipeline='zh_core_web_sm',chunk_size=1000,chunk_overlap=200)# 读取目录下所有后缀是txt的文件
loader = DirectoryLoader('../docs', glob="**/*.txt", loader_cls=TextLoader)#读取文本文件
documents = loader.load()# 使用text_splitter对文档进行分割
split_text = text_splitter.split_documents(documents)
try:for document in tqdm(split_text):# 获取向量并储存到pineconePinecone.from_documents([document], embeddings, index_name=pinecone_index)
except Exception as e:print(f"Error: {e}")quit()
3.8 vectorstores:MyFAISS.py
两个文件,一个__init__.py (就一行代码:from .MyFAISS import MyFAISS),另一个MyFAISS.py,如下代码所示
# 从langchain.vectorstores库导入FAISS
from langchain.vectorstores import FAISS
# 从langchain.vectorstores.base库导入VectorStore
from langchain.vectorstores.base import VectorStore
# 从langchain.vectorstores.faiss库导入dependable_faiss_import
from langchain.vectorstores.faiss import dependable_faiss_import from typing import Any, Callable, List, Dict # 导入类型检查库
from langchain.docstore.base import Docstore # 从langchain.docstore.base库导入Docstore# 从langchain.docstore.document库导入Document
from langchain.docstore.document import Document import numpy as np # 导入numpy库,用于科学计算
import copy # 导入copy库,用于数据复制
import os # 导入os库,用于操作系统相关的操作
from configs.model_config import * # 从configs.model_config库导入所有内容# 定义MyFAISS类,继承自FAISS和VectorStore两个父类
class MyFAISS(FAISS, VectorStore):
接下来,逐一实现以下函数
3.8.1 定义类的初始化函数:__init__
# 定义类的初始化函数def __init__(self,embedding_function: Callable,index: Any,docstore: Docstore,index_to_docstore_id: Dict[int, str],normalize_L2: bool = False,):# 调用父类FAISS的初始化函数super().__init__(embedding_function=embedding_function,index=index,docstore=docstore,index_to_docstore_id=index_to_docstore_id,normalize_L2=normalize_L2)# 初始化分数阈值self.score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD# 初始化块大小self.chunk_size = CHUNK_SIZE# 初始化块内容self.chunk_conent = False
3.8.2 seperate_list:将一个列表分解成多个子列表
# 定义函数seperate_list,将一个列表分解成多个子列表,每个子列表中的元素在原列表中是连续的def seperate_list(self, ls: List[int]) -> List[List[int]]:# TODO: 增加是否属于同一文档的判断lists = []ls1 = [ls[0]]for i in range(1, len(ls)):if ls[i - 1] + 1 == ls[i]:ls1.append(ls[i])else:lists.append(ls1)ls1 = [ls[i]]lists.append(ls1)return lists
3.8.3 similarity_search_with_score_by_vector,根据输入的向量,查找最接近的k个文本
similarity_search_with_score_by_vector 函数用于通过向量进行相似度搜索,返回与给定嵌入向量最相似的文本和对应的分数
# 定义函数similarity_search_with_score_by_vector,根据输入的向量,查找最接近的k个文本def similarity_search_with_score_by_vector(self, embedding: List[float], k: int = 4) -> List[Document]:# 调用dependable_faiss_import函数,导入faiss库faiss = dependable_faiss_import()# 将输入的列表转换为numpy数组,并设置数据类型为float32vector = np.array([embedding], dtype=np.float32)# 如果需要进行L2归一化,则调用faiss.normalize_L2函数进行归一化if self._normalize_L2:faiss.normalize_L2(vector)# 调用faiss库的search函数,查找与输入向量最接近的k个向量,并返回他们的分数和索引scores, indices = self.index.search(vector, k)# 初始化一个空列表,用于存储找到的文本docs = []# 初始化一个空集合,用于存储文本的idid_set = set()# 获取文本库中文本的数量store_len = len(self.index_to_docstore_id)# 初始化一个布尔变量,表示是否需要重新排列id列表rearrange_id_list = False# 遍历找到的索引和分数for j, i in enumerate(indices[0]):# 如果索引为-1,或者分数小于阈值,则跳过这个索引if i == -1 or 0 < self.score_threshold < scores[0][j]:# This happens when not enough docs are returned.continue# 如果索引存在于index_to_docstore_id字典中,则获取对应的文本idif i in self.index_to_docstore_id:_id = self.index_to_docstore_id[i]# 如果索引不存在于index_to_docstore_id字典中,则跳过这个索引else:continue# 从文本库中搜索对应id的文本doc = self.docstore.search(_id)# 如果不需要拆分块内容,或者文档的元数据中没有context_expand字段,或者context_expand字段的值为false,则执行以下代码if (not self.chunk_conent) or ("context_expand" in doc.metadata and not doc.metadata["context_expand"]):# 匹配出的文本如果不需要扩展上下文则执行如下代码# 如果搜索到的文本不是Document类型,则抛出异常if not isinstance(doc, Document):raise ValueError(f"Could not find document for id {_id}, got {doc}")# 在文本的元数据中添加score字段,其值为找到的分数doc.metadata["score"] = int(scores[0][j])# 将文本添加到docs列表中docs.append(doc)continue# 将文本id添加到id_set集合中id_set.add(i)# 获取文本的长度docs_len = len(doc.page_content)# 遍历范围在1到i和store_len - i之间的数字kfor k in range(1, max(i, store_len - i)):# 初始化一个布尔变量,表示是否需要跳出循环break_flag = False# 如果文本的元数据中有context_expand_method字段,并且其值为"forward",则扩展范围设置为[i + k]if "context_expand_method" in doc.metadata and doc.metadata["context_expand_method"] == "forward":expand_range = [i + k]# 如果文本的元数据中有context_expand_method字段,并且其值为"backward",则扩展范围设置为[i - k]elif "context_expand_method" in doc.metadata and doc.metadata["context_expand_method"] == "backward":expand_range = [i - k]# 如果文本的元数据中没有context_expand_method字段,或者context_expand_method字段的值不是"forward"也不是"backward",则扩展范围设置为[i + k, i - k]else:expand_range = [i + k, i - k]# 遍历扩展范围for l in expand_range:# 如果l不在id_set集合中,并且l在0到len(self.index_to_docstore_id)之间,则执行以下代码if l not in id_set and 0 <= l < len(self.index_to_docstore_id):# 获取l对应的文本id_id0 = self.index_to_docstore_id[l]# 从文本库中搜索对应id的文本doc0 = self.docstore.search(_id0)# 如果文本长度加上新文档的长度大于块大小,或者新文本的源不等于当前文本的源,则设置break_flag为true,跳出循环if docs_len + len(doc0.page_content) > self.chunk_size or doc0.metadata["source"] != \doc.metadata["source"]:break_flag = Truebreak# 如果新文本的源等于当前文本的源,则将新文本的长度添加到文本长度上,将l添加到id_set集合中,设置rearrange_id_list为trueelif doc0.metadata["source"] == doc.metadata["source"]:docs_len += len(doc0.page_content)id_set.add(l)rearrange_id_list = True# 如果break_flag为true,则跳出循环if break_flag:break# 如果不需要拆分块内容,或者不需要重新排列id列表,则返回docs列表if (not self.chunk_conent) or (not rearrange_id_list):return docs# 如果id_set集合的长度为0,并且分数阈值大于0,则返回空列表if len(id_set) == 0 and self.score_threshold > 0:return []# 对id_set集合中的元素进行排序,并转换为列表id_list = sorted(list(id_set))# 调用seperate_list函数,将id_list分解成多个子列表id_lists = self.seperate_list(id_list)# 遍历id_lists中的每一个id序列for id_seq in id_lists:# 遍历id序列中的每一个idfor id in id_seq:# 如果id等于id序列的第一个元素,则从文档库中搜索对应id的文本,并深度拷贝这个文本if id == id_seq[0]:_id = self.index_to_docstore_id[id]# doc = self.docstore.search(_id)doc = copy.deepcopy(self.docstore.search(_id))# 如果id不等于id序列的第一个元素,则从文本库中搜索对应id的文档,将新文本的内容添加到当前文本的内容后面else:_id0 = self.index_to_docstore_id[id]doc0 = self.docstore.search(_id0)doc.page_content += " " + doc0.page_content# 如果搜索到的文本不是Document类型,则抛出异常if not isinstance(doc, Document):raise ValueError(f"Could not find document for id {_id}, got {doc}")# 计算文本的分数,分数等于id序列中的每一个id在分数列表中对应的分数的最小值doc_score = min([scores[0][id] for id in [indices[0].tolist().index(i) for i in id_seq if i in indices[0]]])# 在文本的元数据中添加score字段,其值为文档的分数doc.metadata["score"] = int(doc_score)# 将文本添加到docs列表中docs.append(doc)# 返回docs列表return docs
3.8.4 delete_doc方法:删除文本库中指定来源的文本
#定义了一个名为 delete_doc 的方法,这个方法用于删除文本库中指定来源的文本def delete_doc(self, source: str or List[str]):# 使用 try-except 结构捕获可能出现的异常try:# 如果 source 是字符串类型if isinstance(source, str):# 找出文本库中所有来源等于 source 的文本的idids = [k for k, v in self.docstore._dict.items() if v.metadata["source"] == source]# 获取向量存储的路径vs_path = os.path.join(os.path.split(os.path.split(source)[0])[0], "vector_store")# 如果 source 是列表类型else:# 找出文本库中所有来源在 source 列表中的文本的idids = [k for k, v in self.docstore._dict.items() if v.metadata["source"] in source]# 获取向量存储的路径vs_path = os.path.join(os.path.split(os.path.split(source[0])[0])[0], "vector_store")# 如果没有找到要删除的文本,返回失败信息if len(ids) == 0:return f"docs delete fail"# 如果找到了要删除的文本else:# 遍历所有要删除的文本idfor id in ids:# 获取该id在索引中的位置index = list(self.index_to_docstore_id.keys())[list(self.index_to_docstore_id.values()).index(id)]# 从索引中删除该idself.index_to_docstore_id.pop(index)# 从文本库中删除该id对应的文本self.docstore._dict.pop(id)# TODO: 从 self.index 中删除对应id,这是一个未完成的任务# self.index.reset()# 保存当前状态到本地self.save_local(vs_path)# 返回删除成功的信息return f"docs delete success"# 捕获异常except Exception as e:# 打印异常信息print(e)# 返回删除失败的信息return f"docs delete fail"
3.8.5 update_doc和lists_doc
# 定义了一个名为 update_doc 的方法,这个方法用于更新文档库中的文档def update_doc(self, source, new_docs):# 使用 try-except 结构捕获可能出现的异常try:# 删除旧的文档delete_len = self.delete_doc(source)# 添加新的文档ls = self.add_documents(new_docs)# 返回更新成功的信息return f"docs update success"# 捕获异常except Exception as e:# 打印异常信息print(e)# 返回更新失败的信息return f"docs update fail"# 定义了一个名为 list_docs 的方法,这个方法用于列出文档库中所有文档的来源def list_docs(self):# 遍历文档库中的所有文档,取出每个文档的来源,转换为集合,再转换为列表,最后返回这个列表return list(set(v.metadata["source"] for v in self.docstore._dict.values()))
// 待更
第四部分 LLM与知识图谱的结合
// 待更..
参考文献与推荐阅读
- langchain官网:https://python.langchain.com/,API列表:https://api.python.langchain.com/en/latest/api_reference.html
langchain中文网(翻译暂不佳) - LangChain全景图
- 一文搞懂langchain(忽略本标题,因为单看此文还不够)
- csunny/DB-GPT,https://db-gpt.readthedocs.io/en/latest/
- QLoRA:4-bit级别的量化+LoRA方法,用3090在DB-GPT上打造基于33B LLM的个人知识库
- 基于LangChain+LLM构建增强QA、用LangChain构建大语言模型应用、LangChain 是什么
- LangChain 中文入门教程
后记
本文经历了三个阶段
- 对langchain的梳理
langchain的组件很多,想理解透彻的话,需要一步步来
包括我自己刚开始看这个库的时候 真心是晕,无从下手,后来10天过后,可以直接一个文件一个文件的点开 直接看..
总之,凡事都是一个过程 - 对langchain-ChatGLM项目源码的解读
说实话,一开始也是挺晕的,因为各种项目文件又很多,好在后来历时一周总算梳理清楚了 - LLM与数据库、知识图谱的结合
创作、修改、优化记录
- 7.5-7.9日,每天写一一部分
- 7.10,完善第一部分关于什么是langchain的介绍
- 7.11,根据langchain-ChatGLM项目的最新更新,整理已写内容
- 7.12 写完前3.8节,且根据项目流程调整各个文件夹的解读顺序
相当于历时近一周,总算把 “langchain-ChatGLM的整体代码架构” 梳理清楚了 - 7.15,为方便理解,把整个langchain库划分为三个大层:基础层、能力层、应用层