- 使用协程重要的几个函数
- 使用异步方法爬取Microsoft Bing图片
- asyncio.Queue(maxsize=)
说明1:
正常的程序都是从上到下依次执行的,如果遇到了要等待的地方,就会阻塞,等待相应的代码执行完毕后,再往下执行。
说明2:
协程(Coroutine) 是一种特殊的函数,它可以在执行过程中暂停(挂起),并在稍后恢复执行。协程是异步编程的核心概念,允许程序在等待 I/O 操作(如网络请求、文件读写)时挂起当前任务,转而执行其他任务,从而提高程序的并发性能。
说人话:
使用协程异步处理程序就和做家务差不多,在炖汤的时候,使用洗衣机洗衣服,再去洗碗,洗碗的时候抽出时间看看汤炖的咋样了,就会发现,事情都是同时进行的,而不使用异步的方法,就会发生一件很呆的事情:你炖汤的时候在那等着,等汤炖好再去洗衣服,洗完衣服再去洗碗。
使用协程重要的几个函数
-
import asyncio
首先要下载并导入asyncio这个库,这是协程的库 -
async
对函数进行异步标识,我们可以把一个个的函数类比为一项项家务,在函数前面使用async标识,那么函数(家务)就是异步的,就可以并发的执行,一个函数在运行的时候,不用等待该函数(家务)做完,而是可以直接抽空去执行其他的函数。 -
asyncio.create_task()
上面对函数标识后还不够,还需要使用asyncio.create_task()将函数转换为task任务,这个后面具体的代码再解释 -
await
上面的async和asyncio.create_task()标识函数并把函数转换为task任务后,要想实现异步还需要将这个任务挂起来才能真正实现异步,具体操作看后面的案例。 -
asyncio.gather()、asyncio.wait()
与await一起实现将多个task任务一次挂起 -
asyncio.run()
运行统领所有任务的主函数,也就是你在哪个函数上面创建了异步任务就把相应的函数作为参数交给run()处理
举例
import asyncio
import aiohttp
import aiofiles
import timeasync def wash_glass():print("开始洗碗辣!")#假设洗碗要花费2秒,这里await就是为了将等待的时间加入到事件循环中# ,当要等待的时候,将其挂起来,去执行其他的函数(家务)。await asyncio.sleep(2)print("洗完碗辣!")
async def wash_clothes():print("开始洗衣服辣!")await asyncio.sleep(4)#假设洗衣服要花费4秒print("洗完衣服辣!")async def cooking():print("开始做菜辣!")await asyncio.sleep(5)#假设做菜要花费5秒print("做完菜辣!")#main()函数作用是统领家务,在main()函数这个分支下,各种家务分支并行处理,当家务都做完后,再运行后续的代码
async def main():list_task=[]#存放task任务#使用create_task创建tasklist_task.append(asyncio.create_task(wash_clothes()))list_task.append(asyncio.create_task(wash_glass()))list_task.append(asyncio.create_task(cooking()))#最后将任务挂起来# await asyncio.gather(*list_task)#使用gather的方法将任务集中挂起await asyncio.wait(list_task)#使用wait的方法,将任务集中挂起print("家务都做完了")#所有的异步程序(家务)都完成后,才会执行这条语句if __name__ == '__main__':start=time.time()asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())#不加可能会报错,因为与windows系统冲突asyncio.run(main())#执行统领家务的主函数end=time.time()print(f"总共耗时{end-start}秒")
配合这个图来理解吧:
小总结
- async主要用于标识函数
- await很重要,你觉得哪个任务耗时间,就要在前面使用await将其挂起
- create_task()用于创建任务
- gather()或者wait()与await配合使用,便于再main()函数,集中将多个任务挂起
- asyncio.run()运行主函数
使用异步方法爬取Microsoft Bing图片
说明:之前写了一个爬取Microsoft Bing图片的爬虫案例,这里我将其改为异步的方式,与之前的进行对比,里面会用到没有讲过的aiohttp、aiofiles和asyncio.Queue,这三个的作用分别是:
aiohttp:异步网络请求
aiofiles:异步文件写入
asyncio.Queue(maxsize=):队列,用于在异步中存储数据,以免数据错乱
import asyncio
import aiohttp
import aiofiles
import requests
from lxml import etree
import re
import time
import osasync def load_img_url(SFX,first,count,search_name,queue):url='https://cn.bing.com/images/async'headers={'User-Agent':'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Mobile Safari/537.36 Edg/134.0.0.0',}params={'q':search_name,'first': first,#'count': count,#爬取图片的数量'cw': 437,'ch': 603,'relp': 12,'datsrc': 'I','layout': 'ColumnBased_Landscape','apc': 0,'imgbf': 'DfCtqwgAAACQAQAAAAAAAAAAAAAFAAAAsUuBIONfTNlAgBAASAEAQgAQIFEABiAGIBgAgNDAoQAIUAAIgUYIAAiAQCAAABIiIBgIIAAAACkCAAAAAAAAAA==','mmasync': 2,'dgState': 'c*2_y*725s715_i*13_w*204','IG': '32F3E4B3953D4FFCB5B4E5EDB527C8EC','SFX': SFX,#页数'iid': 'images.5306',}dir_name=search_nameif not os.path.exists(f'./{dir_name}_dir'):os.mkdir(f'./{dir_name}_dir')async with aiohttp.ClientSession() as session:#这里使用的就是aiohttp去进行异步网络请求async with session.get(url=url,headers=headers,params=params) as response:chepai_html=await response.text()#注意这里的await,就是异步请求,这里就是异步请求html页面tree=etree.HTML(chepai_html)img_src_list=tree.xpath('//img/@data-src')#获取图片的urlawait queue.put(img_src_list)#将图片的url列表放入队列中async def download_img(queue,search_name):headers={'User-Agent':'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Mobile Safari/537.36 Edg/134.0.0.0',}try:img_url_list=await queue.get()#从队列中取出图片的url列表,并且这里会阻塞直到队列中有数据,所以后面整个函数形成的任务必须要cancelexcept Exception as e:print("出错辣")for url in img_url_list:async with aiohttp.ClientSession() as session:async with session.get(url,headers=headers) as response:response_img=await response.read()p=re.compile(r'https.*/OIP-C\.(?P<name>.*)\?.*')#这里是异步写入文件async with aiofiles.open(f'./{search_name}_dir/'+p.search(url).group('name')+'.jpg','wb') as f:await f.write(response_img)#注意awaitqueue.task_done()#该函数中的异步任务完成后才会执行,告诉queue,放入队列的第一个数据已经被处理完成,可以拿取下一个数据了async def main():queue=asyncio.Queue(maxsize=1)#这里设置的是队列容量,容量选择适中,选择大了占用资源太多,选择小了,生产数据的函数会经常被阻塞# search_name=input('请输入要爬取的图片名称:')search_name="美女"# search_len=input('请输入要爬取的html页面数量:')list_params=[[2,13,12]]list_task_download=[]list_task=[]for i in range(int(2)):#创建请求图片url的task,上面循环几次就创建几个,这些任务很快就会执行完for j in list_params:list_task.append(asyncio.create_task(load_img_url(j[0],j[1],j[2],search_name,queue)))list_params[0][1]+=list_params[0][2]list_params[0][0]+=1list_params[0][2]+=35#创建下载图片的task,上面循环次几次就创建几个,这些任务里面都有get,就必须使用cancel的形式取消list_task_download.append(asyncio.create_task(download_img(queue,search_name))) await asyncio.gather(*list_task,*list_task_download)# 主函数等待队列中的所有数据被处理,防止队列里面还有数据,但是程序提前结束!await queue.join()for i in list_task_download:i.cancel()# 取消有着queue.get()方法的任务,避免陷入永久阻塞的情况if __name__=='__main__':s=time.time()try:asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())#不加会报错,是因为与windows系统冲突asyncio.run(main())except:print("error")e=time.time()print(f"总共耗时{e-s}秒")
asyncio.Queue(maxsize=)
你会发现加入队列后程序就会复杂一点,要考虑数据安全方面的问题,这里稍微解释一下
老婆喊我,暂时写这么多