[PythonAdvanced] 学习笔记 - Day 2
概览
Date: 2024-11-28 Time Spent: 6 hours
Topics: Asyncio Difficulty: ⭐⭐⭐ (1-5 ⭐)
今日计划
学习笔记
-
[asyncio]
-
Main points:
asyncio 是一个使用async/await语法编写并发代码的库。asyncio 通常非常适合 IO 绑定和高级 结构化网络代码。asyncio 提供一组 高层级 API 用于:
- 并发地 运行 Python 协程 并对其执行过程实现完全控制;
- 执行 网络 IO 和 IPC;
- 控制 子进程;
- 通过 队列 实现分布式任务;
- 同步 并发代码;
此外,还有一些 低层级 API 以支持 库和框架的开发者 实现:
- 创建和管理 事件循环,它提供用于 连接网络, 运行 子进程, 处理 OS 信号 等功能的异步 API;
- 使用 transports 实现高效率协议;
- 通过 async/await 语法 桥接 基于回调的库和代码。
-
Example code
import asyncioasync def main():print('Hello ...')await asyncio.sleep(1)print('... World!')asyncio.run(main())
-
Runners Example code
asyncio.run(coro, ***, debug=None)
执行协程coro并返回结果。该函数运行传递的协程,负责管理 asyncio 事件循环、完成异步生成器并关闭线程池。如果 debug 为
True
,事件循环将运行于调试模式。False
将显式地禁用调试模式。 使用None
将沿用全局 Debug 模式 设置。async def main():await asyncio.sleep(1)print('hello')asyncio.run(main())
-
Runners Context Example code
class asyncio.Runner(***, debug=None, loop_factory=None)
对在相同上下文中 多个 异步函数调用进行简化的上下文管理器。loop_factory 可被用来重载循环的创建。 loop_factory 要负责将所创建的循环设置为当前事件循环。 在默认情况下如果 loop_factory 为
None
则会使用asyncio.new_event_loop()
并通过asyncio.set_event_loop()
将其设置为当前事件循环。#run(),close(),get_loop() async def main():await asyncio.sleep(1)print('hello')with asyncio.Runner() as runner:runner.run(main())
-
Coroutines Example code
通过 async/await 语法来声明 Coroutines/协程 是编写 asyncio 应用的推荐方式。
import asyncio import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():print(f"started at {time.strftime('%X')}")await say_after(1, 'hello')await say_after(2, 'world')print(f"finished at {time.strftime('%X')}")>>>asyncio.run(main()) started at 17:13:52 hello world finished at 17:13:55
async def main():task1 = asyncio.create_task(say_after(1, 'hello'))task2 = asyncio.create_task(say_after(2, 'world'))print(f"started at {time.strftime('%X')}")# Wait until both tasks are completed (should take# around 2 seconds.)await task1await task2print(f"finished at {time.strftime('%X')}") >>>asyncio.run(main()) started at 17:14:32 hello world finished at 17:14:34
async def main():async with asyncio.TaskGroup() as tg:task1 = tg.create_task(say_after(1, 'hello'))task2 = tg.create_task(say_after(2, 'world'))print(f"started at {time.strftime('%X')}")# The await is implicit when the context manager exits.print(f"finished at {time.strftime('%X')}") >>>asyncio.run(main()) started at 17:14:32 hello world finished at 17:14:34
-
迭代器 (Iterator):
迭代器是一个对象,它实现了 next() 方法,该方法会返回一个包含两个属性的对象:value 和 done。
value 表示当前迭代到的值。done 是一个布尔值,表示迭代是否完成(true 表示完成,false 表示未完成)。
异步迭代器 (Async Iterator):
异步迭代器是迭代器的异步版本。异步迭代器对象的 anext() 方法返回一个awaitable对象,该对象解析后为 value和done,value是迭代的当前值, done 是布尔值。 -
AsyncIterator Example code
import asyncioclass AsyncNumberGenerator:def __init__(self, max_value):self.max_value = max_valueself.current = 0def __aiter__(self):return selfasync def __anext__(self):if self.current <= self.max_value:await asyncio.sleep(1) # 模拟异步操作(例如,等待I/O操作)value = self.currentself.current += 1return valueelse:raise StopAsyncIteration# 使用异步迭代器 async def main():async for number in AsyncNumberGenerator(5):print(number) # 每秒打印一个从0到5的数字# 运行异步主函数 asyncio.run(main())
-
代码练习
"""
异步操作的主要作用是提高程序的效率和响应性,
尤其是在处理I/O密集型任务(如网络请求、文件读写等)时。
通过异步操作,程序可以在等待某些操作(如网络响应)完成的同时,
继续执行其他任务,从而充分利用系统资源。事件循环(event loop)是异步编程的核心机制。
它是一个运行在一个单独线程中的循环,负责调度和执行异步任务和回调。
事件循环会跟踪哪些任务已经准备好执行,哪些任务需要等待某些事件的发生(如I/O操作完成),
并在适当的时候调用相应的回调函数。
"""
#用上下文管理器的方式写asyncio.run()
#async with 用于管理异步上下文,例如文件操作、网络连接等,
# 通常需要在类中定义 __aenter__ 和 __aexit__ 方法。
#asyncio.run() 返回一个结果,通常是协程的返回值。不接受上下文管理器。
import asyncioasync def main():await asyncio.sleep(0.5) #这行代码让异步函数暂停2秒,模拟异步操作#这意味着在 say_hi 函数等待的这段时间里,事件循环可以运行其他任务。#print('Hello World!')
#with asyncio.Runner() as runner:#runner.run(main()) #get_loop()返回关联到运行器实例的事件循环。# 获取当前上下文中的事件循环。如果当前上下文没有事件循环,它会创建一个新的事件循环。
# 运行直到异步函数被完成。
#asyncio.get_event_loop().run_until_complete(main())"""
迭代器 (Iterator):
迭代器是一个对象,它实现了 next() 方法,
该方法会返回一个包含两个属性的对象:value 和 done。
value 表示当前迭代到的值。
done 是一个布尔值,表示迭代是否完成(true 表示完成,false 表示未完成)。
异步迭代器 (Async Iterator):
异步迭代器是迭代器的异步版本。
异步迭代器对象的 anext() 方法返回一个awaitable对象,
该对象解析为 value是迭代的当前值和 done 是布尔值。
"""#异步迭代器不需要 async with,只需要定义 __aiter__ 和 __anext__ 方法。
class asyncfibo:def __init__(self, max_value):self.max_value = max_valueself.previous =0self.current = 1def __aiter__(self):return selfasync def __anext__(self):if self.current <= self.max_value:await asyncio.sleep(self.previous)value = self.currentself.previous, self.current= self.current, self.previous+self.currentreturn valueelse:raise StopAsyncIteration
#使用 async for 迭代异步迭代器。
async def main():async for value in asyncfibo(5):print(value)
#asyncio.run(main())#使用异步生成器实现更多资源的分配
class AsyncGenerator:def __init__(self, name):self.file_name = nameasync def __aenter__(self):#异步进入上下文,打开文件等资源self.op_file = open(self.file_name, 'w')print(f'file: {self.file_name} opened')return self #返回对象供上下文使用async def __aexit__(self, exc_type, exc_val, exc_tb):#异步退出上下文if self.op_file is not None:self.op_file.close()print(f'file: {self.file_name} closed')else:print(exc_type, exc_val, exc_tb)return False #异常未处理则抛出async def write_data(self,data):#这里怎么不加上self.obj.seek(0)await asyncio.sleep(1)self.op_file.write(data)self.op_file.flush() # 确保数据立即写入磁盘print(f'file: {self.file_name} written {data}')async def read_data(self):# 重新打开文件并读取内容with open(self.file_name, 'r') as file:await asyncio.sleep(1) # 模拟异步操作data = file.read()print(f'File: {self.file_name} read: {data}')return dataasync def main():# 使用异步资源管理器async with AsyncGenerator('example.txt') as manager:await manager.write_data("Hello, Async World!\n")await manager.write_data("Managing multiple resources is easy.\n")content = await manager.read_data()print(f"Content:\n{content}")
# 执行主函数
asyncio.run(main())
遇见的挑战难及解决方法和笔记点见注释。槽一下,太抽象了。
The challenges encountered, solutions, and key points for note-taking are indicated in the comments.
使用资源
- FastAPI官方文档
- asyncio官方文档