一、什么是aiohttp库
aiohttp库官网:https://docs.aiohttp.org/en/stable/
aiohttp是一个Python的HTTP客户端/服务器框架,它基于asyncio库实现异步编程模型,可以支持高性能和高并发的HTTP通信。aiohttp用于编写异步的Web服务器、Web应用程序或HTTP客户端,以提供对Web和HTTP资源的访问和操作。
在aiohttp中,HTTP客户端提供了一种发送HTTP请求和处理响应的异步方式,而HTTP服务器则提供了一种异步处理HTTP请求和响应的方式。在使用aiohttp编写Web应用程序时,我们可以选择使用内置的路由器和视图功能来处理HTTP请求,并使用异步模板引擎来渲染HTML页面。
aiohttp支持WebSocket协议,使得我们可以轻松地在应用程序中实现实时通信和数据推送。aiohttp的API设计简单、易用,与Python开发者熟悉的asyncio风格一致。因此,即使没有使用过aiohttp也可以较快上手。 aipyhttp适用于那些需要高性能、高吞吐量、高并发的Python网络应用场景,如实时聊天、在线游戏、大数据分析等。
二、如何安装aiohttp
pip install aiohttp
import aiohttpasync def main():headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}timeout = aiohttp.ClientTimeout(total=10) # 设置总超时时间为10秒async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:async with session.get('https://www.example.com/') as response:html = await response.text()print(html)if __name__ == '__main__':asyncio.run(main())
三、请求和相应
1、post请求
服务器端代码
from aiohttp import webasync def handle(request):data = await request.post()name = data.get('name', "Anonymous")age = data.get('age', "Unknown")text = f"Hello, {name}, you're {age} years old."return web.Response(text=text)app = web.Application()
app.add_routes([web.get('/', handle),web.post('/', handle)])if __name__ == '__main__':web.run_app(app)
客户端代码
import aiohttp
import asyncioasync def post_data(session, url):data = {'name': 'John', 'age': 30}async with session.post(url, data=data) as response:return await response.text()async def main():async with aiohttp.ClientSession() as session:html = await post_data(session, 'http://localhost:8080/')print(html)loop = asyncio.get_event_loop()
loop.run_until_complete(main())
2、get请求
服务器端代码
from aiohttp import webasync def handle(request):name = request.match_info.get('name', "Anonymous")text = f"Hello, {name}"return web.Response(text=text)app = web.Application()
app.add_routes([web.get('/', handle),web.get('/{name}', handle)])if __name__ == '__main__':web.run_app(app)
客户端代码
import aiohttp
import asyncioasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def main():async with aiohttp.ClientSession() as session:html = await fetch(session, 'http://localhost:8080/John')print(html)loop = asyncio.get_event_loop()
loop.run_until_complete(main())
3、获取响应内容
import aiohttp
import asyncioasync def fetch(session, url):async with session.get(url) as response:# 获取状态码status = response.status# 获取响应头headers = response.headers# 以文本形式获取响应正文text = await response.text()# 以字节形式获取响应正文content = await response.read()return status, headers, text, contentasync def main():async with aiohttp.ClientSession() as session:status, headers, text, content = await fetch(session, 'https://www.example.com')print(status)print(headers)print(text)print(content)loop = asyncio.get_event_loop()
loop.run_until_complete(main())
四、Cookies和Session
from aiohttp import webasync def handle(request):session = await request.session()count = session.get('count', 0)count += 1session['count'] = counttext = f"Visited {count} times."return web.Response(text=text)app = web.Application()
app.add_routes([web.get('/', handle)])if __name__ == '__main__':web.run_app(app)
启用服务起session功能
rom aiohttp import webasync def handle(request):session = await request.session()count = session.get('count', 0)count += 1session['count'] = counttext = f"Visited {count} times."return web.Response(text=text)app = web.Application()
app.add_routes([web.get('/', handle)])
app.cleanup_ctx.append(lambda app: setup_session(app))async def setup_session(app):app['session'] = await aiohttp_session.setup(app, aiohttp_session.SimpleCookieStorage())if __name__ == '__main__':web.run_app(app)
五、异步处理
1、协程
import aiohttp
import asyncioasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def main():async with aiohttp.ClientSession() as session:html = await fetch(session, 'https://www.example.com')print(html)loop = asyncio.get_event_loop()
loop.run_until_complete(main())
2、回调函数
import aiohttp
import asynciodef handle_response(response):print(response.status)asyncio.get_event_loop().stop()async def main():async with aiohttp.ClientSession() as session:url = 'https://www.example.com'async with session.get(url) as response:response.add_done_callback(handle_response)await asyncio.sleep(1)loop = asyncio.get_event_loop()
loop.run_until_complete(main())
六、SSL认证
1、启动ssl认证
import aiohttp
import asyncioasync def main():async with aiohttp.ClientSession() as session:async with session.get('https://www.example.com') as response:print(await response.text())loop = asyncio.get_event_loop()
loop.run_until_complete(main())
2、使用自签名SSL证书的aiohttp示例
import aiohttp
import asyncio
import sslasync def main():ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)ssl_context.load_verify_locations('/path/to/cert.pem')async with aiohttp.ClientSession() as session:async with session.get('https://www.example.com', ssl=ssl_context) as response:print(await response.text())loop = asyncio.get_event_loop()
loop.run_until_complete(main())
七、 WebSockets
WebSocket是一种协议,用于在客户端和服务器之间进行实时双向通信。它允许数据在客户端和服务器之间以全双工方式流动,并且不需要使用HTTP请求/响应周期来发送数据。
WebSocket通过在客户端和服务器之间建立持久连接来实现实时通信。这个连接是基于TCP协议的,但是与HTTP请求/响应不同,它只需要一个握手过程,就可以在客户端和服务器之间创建长期的连接
# 使用aiohttp实现WebSocket
import aiohttp
import asyncioasync def websocket_handler(request):ws = aiohttp.web.WebSocketResponse()await ws.prepare(request)async for msg in ws:if msg.type == aiohttp.WSMsgType.TEXT:await ws.send_str('Hello, ' + msg.data)elif msg.type == aiohttp.WSMsgType.ERROR:breakreturn wsapp = aiohttp.web.Application()
app.add_routes([aiohttp.web.get('/', websocket_handler)])if __name__ == '__main__':aiohttp.web.run_app(app)