原文:读懂 FastChat 大模型部署源码所需的异步编程基础 - 知乎
目录
0. 前言
1. 同步与异步的区别
2. 协程
3. 事件循环
4. await
5. 组合协程
6. 使用 Semaphore 限制并发数
7. 运行阻塞任务
8. 异步迭代器 async for
9. 异步上下文管理器 async with
10. 参考
本文是读懂 FastChat 大模型部署源码系列的第二篇,持续更新中,欢迎关注:
不理不理:读懂 FastChat 大模型部署源码所需的 Web 基础
不理不理:读懂 FastChat 大模型部署源码所需的异步编程基础
如果觉得本文有帮助,麻烦点个小小的赞~可以让更多人看到,谢谢大家啦~
0. 前言
FastChat 是 2023 年非常知名的一个大语言模型项目,该项目不仅提供了大语言模型全量参数微调、Lora参数微调、模型推断、模型量化、模型部署及调度等全套的源代码,而且还开源了他们基于 LLaMA2 底座进行指令微调的一系列 Vicuna 模型权重,因此非常适合学习和使用。
lm-sys/FastChat: An open platform for training, serving, and evaluating large language models. Release repo for Vicuna and Chatbot Arena. (github.com)github.com/lm-sys/FastChat
就 FastChat 模型部署部分而言,它分为三个部分:controller、worker、api_server。这三个服务使用 FastAPI + Uvicorn 的方式构建,都是单线程程序,且各自都支持并发
- controller 负责监控所有 workers 的状态,同时调度 worker,以保证各个同名 worker 之间的负载均衡
- worker 负责加载大语言模型的权重、tokenizer、对话模板等,同时对 api_server 传来的给定超参生成模型推断结果
- api_server 负责承接用户各种各样的 HTTP 调用请求,并最终将任务分发给 worker 进行推断。同时提供缺省参数、报错提示、序列长度检测、打印生成信息等功能
本文将会分享读懂 FastChat 模型部署源码的异步编程基础,绝不超纲(纲是 FastChat)
1. 同步与异步的区别
在传统的同步编程中,代码按照顺序逐行执行,前一个操作完成后才能执行下一个操作。若有一些耗时的操作则会导致整个程序的阻塞,降低程序的性能和响应能力。
而在异步编程中,当遇到耗时的操作(比如 IO)时不会等待操作完成,而是继续执行其他代码。这在有多个用户并发请求的情况下,异步方式编写的接口可以在 IO 等待的过程中去处理其他请求,从而提高程序的性能。
比方说我们去网上下载三张图片,使用同步编程实现:
import requests # requests 是仅支持同步编程的http请求库def download_img(url):print("开始下载:", url)response = requests.get(url) # 发送请求, 下载图片file_name = url.rsplit('_')[-1]with open(file_name, mode='wb') as f: # 将图片保存到本地f.write(response.content)print("下载完成")if __name__ == '__main__':url_list = ['https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg','https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg','https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg']for item in url_list:download_img(item)# 运行结果:
# 开始下载: https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
# 下载完成
# 开始下载: https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg
# 下载完成
# 开始下载: https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg
# 下载完成
假设每下载一张图片需要 1 秒,那么上述程序完整执行大概需要 3 秒,时间主要花在了 IO 部分。
若使用异步编程,则只需 1 秒左右(代码看不懂可忽略)。
import aiohttp # aiohttp 是支持异步编程的http请求库
import asyncioasync def fetch(sess, url):print("发送请求: ", url)async with sess.get(url, verify_ssl=False) as response:print("等待响应: ", url)content = await response.content.read()file_name = url.rsplit('/')[-1]print("开始写入: ", file_name)with open(file_name, mode='wb') as f:f.write(content)async def main():async with aiohttp.ClientSession() as sess:url_list = ['https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg','https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg','https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg']tasks = [asyncio.create_task(fetch(sess, url)) for url in url_list]await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())# 运行结果:
# 发送请求: https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
# 发送请求: https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg
# 发送请求: https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg
# 等待响应: https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg
# 开始写入: 9885883_140434796000_2.jpg
# 等待响应: https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg
# 等待响应: https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
# 开始写入: 014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
# 开始写入: 10370254_003820848001_2.jpg
异步程序会在发送第一张图的下载请求时,不等结果的返回就开始第二张图的下载;会在发送第二张图的下载请求时,依然不等结果的返回就开始第三张图的下载。
像上面这种经过 async 和 await 关键字装饰的函数,称之为基于协程的异步函数,这种编程方式也叫异步编程。异步编程是通过让一个线程在执行某个任务的 IO 等待时间去执行其他任务,从而实现并发。
2. 协程
定义形式为 async def 的函数称之为协程(异步函数)。
# 定义一个协程函数
async def func():pass
调用协程只会创建协程对象,不会执行函数内部的代码。若想执行协程的内部代码,须配合事件循环一起使用。
协程的优势在于,能够在 IO 等待时执行其他协程,当 IO 操作结束后会自动回调至原先协程,这样就可以在节省资源的同时提高性能。另外,协程也让原本需要用异步+回调方式完成的非人类代码,用看似同步的方式写出来。
3. 事件循环
事件循环,可以把他当做是一个 while 循环,这个 while 循环会周期性的运行并执行一些任务,然后在特定条件下终止循环。
# 伪代码
任务列表 = [ 任务1, 任务2, 任务3,... ]
while True:可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回for 已就绪任务 in 已准备就绪的任务列表:执行 已就绪任务for 已完成的任务 in 已完成的任务列表:在任务列表中移除 已完成的任务如果 任务列表 中的任务都已完成,则终止循环
如果想要执行协程函数内部的代码,需要事件循环和协程对象配合才能实现,如:
import asyncioasync def func():print("inner code")# 方式一
# loop = asyncio.get_event_loop() # 创建一个事件循环
# loop.run_until_complete(func()) # 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止# 方式二
asyncio.run(func()) # 是一个简便的写法,与方式一本质相同# 运行结果:
# inner code
asyncio 是 Python3.4 中的新增模块,它提供了一种机制,使得你可以用协程、IO 复用在单线程环境中编写并发模型。
上述代码可以简单理解为:将协程当做任务添加到事件循环的任务列表,然后事件循环会检测列表中的协程是否已准备就绪(默认可理解为就绪状态),如果准备就绪则执行其内部代码。
4. await
await 是在 Python3.5 中引入的关键字,表示等待其后对象运行结束,后面可接协程对象、Task 对象(封装后的协程对象)、IO 操作。
- await 后接 IO 操作时,会将当前协程(任务)挂起,事件循环此时会协调执行其他协程(任务),等 IO 操作完成之后,不管程序当前在执行哪个协程(任务),都会返回原先协程的挂起处继续往下执行。通常我们会使用 asyncio.sleep(t) 让当前协程阻塞 t 秒,以模拟 IO 操作
- await 后接协程对象或 Task 对象时,也会将当前协程挂起,转而去运行指定的协程或 Task。另外,当且仅当指定的协程或 Task 运行结束后,才能返回原先协程挂起的位置继续运行
值得注意的是,要调用协程,必须使用 await 关键字;另外也不能在同步函数里使用 await,否则会报错。下面使用异步编程方式执行 1+2=3 的计算过程。
import asyncioasync def compute(x, y):print("Compute %s + %s ..." % (x, y))await asyncio.sleep(1.0)return x + yasync def print_sum(x, y):print("Start compute ...")result = await compute(x, y)print("%s + %s = %s" % (x, y, result))asyncio.run(print_sum(1, 2))# 运行结果:
# Start compute ...
# Compute 1 + 2 ...
# 1 + 2 = 3
- 当事件循环开始时,它会寻找协程以执行调度,因为事件循环注册了 print_sum(),所以 print_sum() 被调用
- 执行到 result = await compute(x, y) 这条语句时(等同于 result = yield from compute(x, y)),由于 compute() 自身就是一个协程,因此 print_sum() 这个协程就会被暂时挂起
- compute() 被加入到事件循环中,程序流执行 compute() 中的 print 语句,打印”Compute %s + %s …”
- 然后执行 await asyncio.sleep(1.0),因为 asyncio.sleep() 也是一个协程,接着 compute() 就会被挂起,等待计时器读秒
- 在这 1 秒的过程中,事件循环会在队列中查询其他可以被调度的协程,而因为此前 print_sum() 与 compute() 都被挂起了,因此事件循环会停下来等待协程的调度
- 当计时器读秒结束后,程序流便会返回到 compute() 中执行 return 语句,结果返回到 print_sum() 的 result 中,最后打印 result
- 由于事件队列中此时没有可以调度的任务了,因此事件队列关闭,程序结束
上述示例在 IO 等待时无法演示切换到其他任务的效果,难以体会到协程的优势,要想在程序中创建多个任务对象,就需要使用 Task。
5. 组合协程
通过 asyncio.create_task(协程对象) 方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。
asyncio.gather(任务列表) 会将任务列表中传入的一系列任务合并成一个组合协程,其内部也是异步执行的。组合协程总的执行时间取决于任务列表中最耗时的那个任务,同时也只有当任务列表中的所有任务都执行完毕,才能返回主协程挂起处继续执行剩余的代码。
import asyncioasync def func(n):print("start: ", n)await asyncio.sleep(n * 0.1) # 模拟IO操作print("end: ", n)return nasync def main():# 按顺序在任务列表中分别添加四个任务tasks = [asyncio.create_task(func(i)) for i in range(1, 5)]# 组合协程的执行时间取决于tasks中最耗时的那个任务complete = await asyncio.gather(*tasks)# 只有所有任务都执行完毕才能执行下面的语句# 返回值顺序同tasks内部元素的定义顺序for i in complete:print("当前数字: ", i)asyncio.run(main())# 运行结果:
# start: 1
# start: 2
# start: 3
# start: 4
# end: 1
# end: 2
# end: 3
# end: 4
# 当前数字: 1
# 当前数字: 2
# 当前数字: 3
# 当前数字: 4
- 程序开始时事件循环中仅有 main() 协程,因此首先执行主协程 main()
- 主协程先后将 func(1)、func(2)、func(3)、func(4) 四个协程作为任务添加到 tasks 中,随后 asyncio.gather(*tasks) 执行由 tasks 组成的组合协程,因此这四个协程也被先后添加到事件循环中。此时主协程在 complete = await asyncio.gather(*tasks) 处被挂起,等待组合协程全部运行完毕后的返回结果
- 事件循环首先执行 func(1),运行到 asyncio.sleep(0.1) 时,func(1) 协程会被挂起,事件循环在这短暂的睡眠时间(模拟 IO)查询其他可以运行的协程。由于 main()、func(1) 都被挂起,此时事件循环中还剩余 func(2)、func(3)、func(4) 三个协程
- 事件循环随后执行 func(2),运行到 asyncio.sleep(0.2) 时,func(2) 协程也被挂起,此时事件循环中还剩余 func(3)、func(4) 两个协程
- 事件循环执行 func(3),运行到 asyncio.sleep(0.3) 时,func(3) 被挂起,事件循环中还剩 func(4) 协程
- 事件循环执行 func(4),运行到 asyncio.sleep(0.4) 时,func(4) 被挂起。此时事件循环中没有其他可执行的任务,因此等待读秒结束
- 由于 func(1) 只睡 0.1 秒,因此首先苏醒。事件循环接着 func(1) 协程 asyncio.sleep(0.1) 的地方继续往下运行,打印字符后 func(1) 结束运行
- 又过了约 0.1 秒,asyncio.sleep(0.2) 读秒结束,事件循环接着 func(2) 协程 asyncio.sleep(0.2) 的地方继续往下运行,打印字符后 func(2) 也结束运行
- 同理,func(3) 和 func(4) 也先后苏醒,并接着挂起的地方运行直至结束
- 结果返回到主协程 main() 的 complete 中,四个协程的返回结果按顺序打印,程序结束
6. 使用 Semaphore 限制并发数
由于异步编程也仅是单线程运行,为了防止服务超载,我们有时候需要使用 asyncio.Semaphore(n) 限制最大并发数量。
asyncio.Semaphore(n) 内部管理一个计数器,计数器的初始值为 n,即最大并发数量。该计数器由 acquire() 调用递减,release() 调用递增,且计数器永远不会低于零。
如果并发数没有达到上限,那么 acquire() 会瞬间执行完成,进入正式代码中。如果并发数已经达到了限制,那么其他的协程会阻塞在 acquire() 这个地方,直到正在运行的某个协程调用 release(),才会放行一个新的协程。
import asyncio
from datetime import datetimeasync def func(n, semaphore):print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} enter") # 第5行# ----------------------------------------------------------------------------------await semaphore.acquire() # 第7行print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} start") # 第8行await asyncio.sleep(2) # 第9行print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} Semaphore(value={semaphore._value}, locked={semaphore.locked()})")semaphore.release() # 第11行# ----------------------------------------------------------------------------------# 横线里的这段代码等价于# async with semaphore:# print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} start")# await asyncio.sleep(2)# print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} Semaphore(value={semaphore._value}, locked={semaphore.locked()})")print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} end") # 第18行return nasync def main():semaphore = asyncio.Semaphore(3)tasks = [asyncio.create_task(func(i, semaphore)) for i in range(6)]complete = await asyncio.gather(*tasks)for i in complete:print("当前数字: ", i)asyncio.run(main())# 运行结果:
# time: 14:15:29 func 0 enter
# time: 14:15:29 func 0 start
# time: 14:15:29 func 1 enter
# time: 14:15:29 func 1 start
# time: 14:15:29 func 2 enter
# time: 14:15:29 func 2 start
# time: 14:15:29 func 3 enter
# time: 14:15:29 func 4 enter
# time: 14:15:29 func 5 enter
# time: 14:15:31 func 0 Semaphore(value=0, locked=True)
# time: 14:15:31 func 0 end
# time: 14:15:31 func 1 Semaphore(value=1, locked=False)
# time: 14:15:31 func 1 end
# time: 14:15:31 func 2 Semaphore(value=2, locked=False)
# time: 14:15:31 func 2 end
# time: 14:15:31 func 3 start
# time: 14:15:31 func 4 start
# time: 14:15:31 func 5 start
# time: 14:15:33 func 3 Semaphore(value=0, locked=True)
# time: 14:15:33 func 3 end
# time: 14:15:33 func 4 Semaphore(value=1, locked=False)
# time: 14:15:33 func 4 end
# time: 14:15:33 func 5 Semaphore(value=2, locked=False)
# time: 14:15:33 func 5 end
# 当前数字: 0
# 当前数字: 1
# 当前数字: 2
# 当前数字: 3
# 当前数字: 4
# 当前数字: 5
- 事件循环执行主协程 main(),主协程运行至 asyncio.gather(*tasks) 处挂起,等待组合协程的运行结果
- 首先执行任务列表中的 func0,打印“func 0 enter”,由于此时并发数为1,因此 acquire() 瞬间执行完毕并打印“func 0 start”。随后遇到 asyncio.sleep(2),func0 被挂起
- 事件循环执行 func1,打印“func 1 enter”,由于此时并发数为2,因此 acquire() 瞬间执行完毕并打印“func 1 start”。之后遇到 asyncio.sleep(2),func1 也被挂起
- func2 同理,执行到 asyncio.sleep(2) 时被挂起。
- 随后执行 func3,打印“func 3 enter”,运行到 acquire() 时,由于此时并发数量为 3(func0、func1、func2 并未执行 release()),因此 func3 卡在 acquire() 处并被挂起
- 执行 func4,打印“func 4 enter”,由于并发数量已达上限,因此 func4 卡在 acquire() 处并被挂起; func5 同理,打印“func 5 enter”后在 acquire() 处挂起
- 因为事件循环中现在没有其他可执行的协程,所以等待 sleep 读秒结束
- func0、func1、func2 同样是睡 2 秒,func0 先睡因此先醒。事件循环接着 func0 的 asyncio.sleep(2) 挂起处继续往下执行 ,运行到 release() 时,并发数减1。此时允许其他协程执行 acquire(),不过因为是单线程运行,还没运行到 func3,因此 func3 依然处于挂起状态。func0 打印两段信息后运行结束
- func1 随后苏醒,接着 func1 asyncio.sleep(2) 挂起处继续往下执行,运行到 release() 时,并发数再减1。func1 打印两段信息后运行结束;func2 同理
- 此时回到 func3 acquire() 挂起处继续往下执行,由于此时的并发数量为0,因此 acquire() 瞬间执行完毕并在 asyncio.sleep(2) 处挂起
- 随后回到 func4 acquire() 挂起处继续往下执行......
7. 运行阻塞任务
阻塞任务是指阻止当前线程继续进行的任务,如果在 asyncio 程序中执行阻塞任务,它会停止整个事件循环,从而阻止其他协程在后台运行。
我们可以通过 asyncio.to_thread(func()) 函数在程序中另开一个单独的线程,异步运行阻塞任务,该函数返回一个可被等待以获取 func() 最终结果的协程。
import asyncio
import time
import datetimedef blocking_task():print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} blocking_task start")time.sleep(5)print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} blocking_task end")return "blocking_task Done"async def func(n):print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} func {n} start")await asyncio.sleep(2)print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} func {n} end")return f"func {n} Done"async def main():# 可以尝试用blocking_task(); tasks = [asyncio.create_task(func(i)) for i in range(3)]替换下面这句, 看看会发生什么tasks = [asyncio.to_thread(blocking_task)] + [asyncio.create_task(func(i)) for i in range(3)]complete = await asyncio.gather(*tasks)for i in complete:print("当前结果: ", i)asyncio.run(main())# 运行结果:
# time: 15:00:53 func 0 start
# time: 15:00:53 func 1 start
# time: 15:00:53 func 2 start
# time: 15:00:53 blocking_task start
# time: 15:00:55 func 0 end
# time: 15:00:55 func 1 end
# time: 15:00:55 func 2 end
# time: 15:00:58 blocking_task end
# 当前结果: blocking_task Done
# 当前结果: func 0 Done
# 当前结果: func 1 Done
# 当前结果: func 2 Done
可以看到,另起一个线程异步运行阻塞任务时,完全不影响事件循环中其他协程的运行。
8. 异步迭代器 async for
for 循环遍历一个可迭代对象时,遍历过程中无法执行其他任务,而 async for 语法允许我们在异步环境下遍历可迭代对象。
import asyncioasync def a_generator():for i in range(3):await asyncio.sleep(1) # 模拟IO耗时操作yield iasync def iter_func():print(f"iter_func start")async for item in a_generator():print(item)print(f"iter_func end")return 'iter_func Done'async def func(i):print(f"func{i} start")await asyncio.sleep(1.5) # 模拟IO耗时操作print(f"func{i} end")return f'func{i} Done'async def main():tasks = [asyncio.create_task(func(1)), asyncio.create_task(iter_func()), asyncio.create_task(func(2))]complete = await asyncio.gather(*tasks)for i in complete:print("当前结果: ", i)asyncio.run(main())# 运行结果:
# func1 start
# iter_func start
# func2 start
# 0
# func1 end
# func2 end
# 1
# 2
# iter_func end
# 当前结果: func1 Done
# 当前结果: iter_func Done
# 当前结果: func2 Done
从上述示例中可以看出,运行 async for item in a_generator() 时不会因为还没迭代结束就阻塞事件循环,而是会在 IO 时协调运行其他协程。
9. 异步上下文管理器 async with
使用 with 进行上下文管理,解释器会在进入时自动调用 __enter__ 方法,退出时调用 __exit__ 方法。整个流程顺序执行,因而无法在 __enter__ 与 do_something()、do_something() 与 __exit__ 之间穿插其他任务。
class Example:def __enter__(self):print('enter') #进入资源return selfdef __exit__(self, exc_type, exc_val, exc_tb):print('exit') #释放资源def do_something(self):print('do_something')with Example() as example:example.do_something()# 运行结果:
# enter
# do something
# exit
async with 称为异步上下文管理器,能够将其进入的 __enter__ 和退出的 __exit__ 函数暂时挂起,以执行事件循环中的其他协程。为了实现这样的功能,需要加入两个新的方法:__aenter__ 和 __aexit__,这两个方法都需要返回 awaitable 类型的值。
import asyncioasync def log(text):await asyncio.sleep(0.1)print(text)class AsyncContextManager:async def __aenter__(self):await log('entering context')async def __aexit__(self, exc_type, exc, tb):await log('exiting context')async def run_async_with():print("async with func start")async with AsyncContextManager() as c:print("使用 async with 来管理异步上下文")print("async with func end")return f"async with func Done"async def func(i):print(f"func{i} start")await asyncio.sleep(1) # 模拟IO耗时操作print(f"func{i} end")return f'func{i} Done'async def main():tasks = [asyncio.create_task(func(1)),asyncio.create_task(run_async_with()),asyncio.create_task(func(2))]complete = await asyncio.gather(*tasks)for i in complete:print("当前结果: ", i)asyncio.run(main())# 运行结果:
# func1 start
# async with func start
# func2 start
# entering context
# 使用 async with 来管理异步上下文
# exiting context
# async with func end
# func1 end
# func2 end
# 当前结果: func1 Done
# 当前结果: async with func Done
# 当前结果: func2 Done
10. 参考
【精选】多任务编程事件循环_fastapi事件循环代码入口_发呆的比目鱼的博客-CSDN博客
从0到1,Python异步编程的演进之路 - 知乎 (zhihu.com)
Python协程 & 异步编程(asyncio) 入门介绍 - 知乎 (zhihu.com)
一份详细的asyncio入门教程 - 知乎 (zhihu.com)
Python 为什么需要async for和async with|极客笔记 (deepinout.com)
with与async with - 简书 (jianshu.com)
Python 3.5+ 协程 ( coroutines ) 之 async with 表达式 - 简单教程,简单编程 (twle.cn)
Python asyncio.Semaphore用法及代码示例 - 纯净天空 (vimsky.com)
如何使用 asyncio 限制协程的并发数 - 侃豺小哥 - 博客园 (cnblogs.com)
Python 异步: 在 Asyncio 中运行阻塞任务(14) - 知乎 (zhihu.com)
如有错误,欢迎指正!近期也在加紧制作一期 FastChat 大模型部署时的并发及调度原理详解,敬请期待
发布于 2023-11-17 01:18・IP 属地上海