【python高级】asyncio 并发编程

【大家好,我是爱干饭的猿,本文重点介绍python高级篇的事件循环,task取消和协程嵌套、call_soon、call_later、call_at、 call_soon_threadsafe、asyncio模拟http请求、asyncio同步和通信、aiohttp实现高并发实践。

后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】

上一篇文章:《【python高级】多线程、多进程和线程池编程》

1. 事件循环

  • 包含各种特定系统实现的模块化事件循环
  • 传输和协议抽象
  • 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持
  • 模仿futures模块但适用于事件循环使用的Future类
  • 基于yield from的协议和任务,可以让你用顺序的方式编写并发代码
  • 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  • 模仿threading模块中的同步原语、可以用在单线程内的协程之间

1.1 开始一个协程

# 事件循环回调(驱动生成器)+epollIO多路复用)
# asyncio是python用于解决异io编程的一整套解决方案
# tornado, gevent, twisted (scrapy.django channels)
# tornado(实现web服务器) django+flask(uwsgi gunicorn+nginx)
# tornado可以直接部署,nginx+tornadoimport asyncio
import timeasync def get_html(url):print("start get url")# time.sleep(2)  执行此代码是顺序执行await asyncio.sleep(2)print("end get url")if __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [get_html("www.baidu.com") for i in range(10)]start_time = time.time()loop.run_until_complete(asyncio.wait(tasks))end_time = time.time()print("耗时:{}".format(end_time-start_time))

1.2 获取协程返回值和callback逻辑

import asyncio
import time
from functools import partial# 获取协程返回值和callback逻辑
async def get_html(url):print("start get url")await asyncio.sleep(2)return "body"def callback(url, future):print("url:{}".format(url))print("send email to me")if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 方式1# get_future = asyncio.ensure_future(get_html("www.baidu.com"))# loop.run_until_complete(get_future)# print(get_future.result())# print("耗时:{}".format(time.time()-start_time))# 方式2# task = loop.create_task(get_html("www.baidu.com"))# loop.run_until_complete(task)# print(task.result())# print("耗时:{}".format(time.time()-start_time))# 加入callbacktask = loop.create_task(get_html("www.baidu.com"))# task.add_done_callback(callback) callback未传入参数task.add_done_callback(partial(callback, "www.baidu.com"))loop.run_until_complete(task)print(task.result())print("耗时:{}".format(time.time()-start_time))

1.3 await 和 gather

import asyncio
import timeasync def get_html(url):print("start get url")await asyncio.sleep(2)print("end get url")if __name__ == '__main__':loop = asyncio.get_event_loop()start_time = time.time()# gather和wait的区别# gather更加high-leveL# gather可以进行分组# # 方法1# group1 = [get_html("www.baidu.com") for i in range(2)]# group2 = [get_html("www.baidu.com") for i in range(2)]# loop.run_until_complete(asyncio.gather(*group1, *group2))# 方法2group1 = [get_html("www.baidu.com") for i in range(2)]group2 = [get_html("www.baidu.com") for i in range(2)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)loop.run_until_complete(asyncio.gather(group1, group2))print("耗时:{}".format(time.time()-start_time))

2. task取消和协程嵌套

2.1 task 取消

import asyncio
import timeasync def get_html(sleep_time):print("waiting")await asyncio.sleep(sleep_time)print("done after {}s".format(sleep_time))if __name__ == '__main__':task1 = get_html(2)task2 = get_html(3)task3 = get_html(3)tasks = [task1, task2, task3]loop = asyncio.get_event_loop()try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e:# 此报错可以在Linux中运行时按 ctrl+c 复现all_tasks = asyncio.all_tasks()for task in all_tasks:print("task cancel")print(task.cancel())loop.stop()loop.run_forever()finally:loop.close()

2.2 协程嵌套

import asyncioasync def compute(x,y):print("Compute %s + %s ..." % (x, y))await asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

子协程调用原理:
在这里插入图片描述

3. call_soon、call_later、call_at、 call_soon_threadsafe

import asynciodef callback(sleep_times):print("sleep {} success".format(sleep_times))def stoploop(loop):loop.stop()if __name__ == '__main__':loop = asyncio.get_event_loop()# 1. call_soon立即执行# loop.call_soon(callback, 2)# loop.call_soon(callback, 1)# 2. call_later 后于call_soon执行,多个call_later按照delay顺序执行# loop.call_later(2, callback, 2)# loop.call_later(1, callback, 1)# loop.call_later(3, callback, 3)# 3. call_at 指定时间执行# cur_time = loop.time()# loop.call_at(cur_time+2, callback, 2)# loop.call_at(cur_time+1, callback, 1)# loop.call_at(cur_time+3, callback, 3)# 4. call_soon_threadsafe和call_soon 使用方法一致,但是是线程安全的loop.call_soon_threadsafe(callback, 1)loop.call_soon(stoploop, loop)loop.run_forever()

4. ThreadPoolExecutor + asyncio

import asyncio
from concurrent.futures import ThreadPoolExecutordef get_url(url):passif __name__ == "__main__ ":import timestart_time = time.time()loop = asyncio.get_event_loop()executor = ThreadPoolExecutor()tasks = []for url in range(20):url = "http://shop.dd.com/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))print("last time:{".format(time.time() - start_time))

5. asyncio模拟http请求

#requests -> urlib ->socket
import socket
import asyncio
from urllib.parse import urlparseasync def get_url(url ):# 通过socket请求htmLurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"# 建立socket连接# client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 建立socket连接reader, writer = await asyncio.open_connection(host, 80)writer.write("GET {HTTP/1.1r\nHost:{}r\nConnection:close\r\n\r\n".format(path, host).encode( "utf-8"))all_lines = []async for raw_line in reader:data = raw_line.decode("utf8")all_lines.append(data)html = "ln".join(all_lines)return htmlif __name__ == '__main__':import timestart_time = time.time()loop = asyncio.get_event_loop()tasks = []for url in range(20):url = "http: // shop.projectsedu.com/goods/{0}/".format(url)tasks.append(asyncio.ensure_future(get_url(url)))loop.run_until_complete(asyncio.wait(tasks))print('last time:{}'.format(time.time()))

6. asyncio同步和通信

import asyncio
from asyncio import Lock, Queueimport aiohttpcache = {}
lock = Lock()
# queue = Queue() 和 queue = [], 如果需要流量限制就需要使用Queue()async def get_stuff(url) :async with lock:if url in cache:return cache[url]stuff = await aiohttp.request('GET', url)cache[url] = stuffreturn stuffasync def parse_stuff():stuff = await get_stuff()#do some parsingasync def use_stuff():stuff = await get_stuff()#use stuff to do something interesting

7. aiohttp实现高并发实践

import re
import asyncio
import aiohttp
import aiomysql
from pyquery import PyQuerystart_url = 'https://developer.aliyun.com/article/698731'waitting_urls = []
seen_urls = set()
stopping = Falseasync def fetch(url, session):async with session.get(url) as response:if response.status != 200:return Nonetry:return await response.text(encoding='ISO-8859-1')except UnicodeDecodeError:return await response.read()async def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)def extract_urls(html):if html is None or not isinstance(html, str):returnif not html.strip():  # 如果html内容为空或者只包含空白字符returntry:urls = []pq = PyQuery(html)# 在这里继续处理pq对象pq = PyQuery(html)for link in pq.items("a"):url = link.attr("href")if url and url.startswith("http") and url not in seen_urls:urls.append(url)waitting_urls.append(url)return urlsexcept Exception as e:print(f"Failed to parse HTML: {e}")async def article_handler(url, session, pool):# 获取文章详情并解析入库html = await fetch(url, session)seen_urls.add(url)extract_urls(html)pq = PyQuery(html)title = pq("article-title").text()if len(title) == 0:print("No valid title found for article: {}".format(url))else:async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute("SELECT 42;")insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql)async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waitting_urls) == 0:await asyncio.sleep(1)continueurl = waitting_urls.pop()print("start get url: {}".format(url))if re.match('https://world.taobao.com', url):continueif re.match('http://.*?developer.aliyun.com/article/\d+/', url):if url not in seen_urls:asyncio.ensure_future(article_handler(url, session, pool))await asyncio.sleep(2)  # 适当的等待时间else:if url not in seen_urls:await init_urls(url, session)async def main(loop):# 等待MySQL建立连接pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,user='root', password='*****',db='aiomysql_test', loop=loop,charset='utf8', autocommit=True)async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':loop = asyncio.get_event_loop()asyncio.ensure_future(main(loop))try:loop.run_forever()except KeyboardInterrupt:stopping = Trueloop.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/168208.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

error C2143的原因及解决办法

error C2143的原因及解决办法 在C编程中,经常会遇到各种错误。其中之一就是error C2143。本文将讨论error C2143的原因,并给出相应的解决办法。 error C2143通常是由于语法错误引起的。具体而言,C2143错误表示编译器无法识别代码中的某个符…

UE5蓝图接口使用方法

在内容区右键创建蓝图接口 命名自定义(可以用好识别的) 双击打开后关闭左边窗口 右键函数 -- 重命名 -- 名称自定义(用好记的) 点击下边输入后面的 号创建一个变量 点击编译并保存 在一个蓝图类里面 -- 点击类设置 在右侧已实现的…

python3GUI--QQ音乐By:PyQt5(附下载地址)

文章目录 一.前言二.展示0.播放页1.主界面1.精选2.有声电台3.排行4.歌手5.歌单 2.推荐3.视频1.视频2.分类3.视频分类 4.雷达5.我喜欢1.歌曲2.歌手 6.本地&下载7.最近播放8.歌单1.一般歌单2.自建歌单3.排行榜 9.其他1.搜索词推荐2.搜索结果 三&#x…

日语形容词分类

かっこいい的否定变形是かっこよくない

conda修改虚拟环境名称

conda 修改虚拟环境名称 conda 不能直接更改名称,但是可以通过克隆环境解决 新建环境(克隆旧环境) conda create --name 新环境名 --clone 旧环境名 删除原环境 conda remove --name 旧环境名 --all 查看现有环境 conda env list conda i…

从零开始开发抖音小程序:与餐饮团购的完美融合

本文将探讨如何从零开始开发一个创新的抖音小程序,以其独特的特性与餐饮团购进行完美融合。 一、什么是抖音小程序? 抖音小程序为开发者提供了在用户观看视频时进行无缝体验的机会。通过借助抖音的庞大用户基础,开发者可以将自己的创意呈现给…

牛客网刷题笔记131111 Python实现LRU+二叉树先中后序打印+SQL并列排序

从学校步入职场一年多,已经很久没刷过题了,为后续稍微做些提前的准备,还是重新开始刷刷题。 从未做过计划表,这回倒是做了个计划表,希望能坚持吧。 刷题比较随性且量级不大,今天就写了2个算法2个sql&#x…

简单版本管理服务编写

说明: 制作android应用内更新的时候,经常会用到版本检查,下载,安装,这时候需要写一个版本管理服务。 本文说明了自己编写版本服务的简单经过。 解决方案: 该软件实现如下功能: 创建后台接口:版本软件上传…

Java_继承和多态

文章目录 前言继承继承语法继承总结super指定访问父级子类构造方法super和this再谈初始化(执行顺序)protected 关键字继承方式final 关键字继承与组合 多态动态绑定与静态绑定多态实现条件重写 前言 适合复习看 继承 继承语法 修饰符 class 子类 extends 父类 { // ... }子类…

归并排序 merge Sort + 图解 + 递归 / 非递归

归并排序(merge sort)的主要思想是:将若干个有序序列逐步归并,最终归并为一个有序序列二路归并排序(2-way merge sort)是归并排序中最简单的排序方法 (1)二路归并排序的递归实现 // 二路归并排序的递归实现 void merge(vector&l…

STM32F4X SDIO(九) 例程讲解-SD卡擦除、读写

STM32F4X SDIO (九) 例程讲解-SD卡擦除、读写 例程讲解-SD卡擦除、读写SD卡擦除CMD32:ERASE_WR_BLK_START命令发送命令响应 CMD33:ERASE_WR_BLK_END命令发送命令响应CMD38:ERASE命令响应 CMD13:SD_CMD_SEND_STATUS命令发送命令回应 SD卡读数据CMD16:SET_…

[Mac软件]Adobe Media Encoder 2024 V24.0.2免激活版

软件说明 使用Media Encoder,您将能够处理和管理多媒体。插入、转码、创建代理版本,并几乎以任何可用的格式输出。在应用程序中以单一方式使用多媒体,包括Premiere Pro、After Effects和Audition。 紧密整合 与Adobe Premiere Pro、After …