1. asyncio
库的事件循环机制
- 事件循环定义:事件循环是
asyncio
的核心,它是一个无限循环,负责不断地检查和执行已经准备好(就绪)的任务(协程对象包装后的任务)。它会在不同的任务之间进行切换,让每个任务都有机会执行一段时间,模拟出并发执行的效果。
- 工作原理:事件循环会维护一个任务队列,新创建的任务(如通过
asyncio.create_task()
创建)会被添加到这个队列中。事件循环会不断地从队列中取出任务,执行任务中的可运行部分(直到遇到await
语句,该任务会被挂起,等待await
的对象完成),然后将任务放回队列或根据完成情况移除。同时,事件循环还会监听I/O操作的状态变化,当I/O操作完成(例如网络请求返回数据),相关的任务会被重新标记为可运行状态,加入任务队列等待执行。
2. 协程概念
- 协程定义:在Python中,协程是一种基于生成器的异步编程模型。使用
async def
关键字定义的函数就是一个协程函数,调用协程函数不会立即执行函数体,而是返回一个协程对象。协程对象可以被await
,这会暂停当前协程的执行,直到被await
的协程完成。
- 与生成器的关系:协程基于生成器扩展而来。普通生成器使用
yield
暂停和恢复执行,而协程使用await
暂停执行,并在等待的操作完成后恢复。协程可以暂停执行,让出控制权给事件循环,使得其他协程有机会执行,实现高效的并发。
3. 利用asyncio
实现高效的并发请求
import asyncio
import aiohttp
async def fetch(session, url):
try:
async with session.get(url) as response:
return await response.text()
except aiohttp.ClientError as e:
print(f"请求 {url} 时发生错误: {e}")
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
urls = ["http://example.com", "http://another-example.com"]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(urls))
print(results)
- 创建任务:使用
asyncio.create_task()
或asyncio.gather()
创建任务。asyncio.create_task()
用于将一个协程包装成任务并立即安排到事件循环中执行;asyncio.gather()
接受多个协程对象,将它们包装成任务并并行执行,返回一个包含所有任务结果的Future
对象。
- 使用
aiohttp
库:在异步网络请求中,aiohttp
是常用的库。它提供了异步的HTTP客户端和服务器功能,与asyncio
无缝集成。通过async with aiohttp.ClientSession()
创建一个会话对象,在会话对象上使用get
、post
等方法发起异步请求。
4. 处理异步操作中的异常
- 捕获单个协程异常:在单个协程函数内部,可以使用
try - except
块捕获异常。如上述代码中fetch
函数内捕获aiohttp.ClientError
异常,这样在请求单个URL出现错误时,可以进行相应处理而不影响其他请求。
- 捕获多个任务异常:当使用
asyncio.gather()
运行多个任务时,可以通过设置return_exceptions=True
参数。这样如果某个任务抛出异常,asyncio.gather()
不会立即引发异常,而是将异常作为结果的一部分返回。例如:
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 发生异常: {result}")
else:
print(f"任务 {i} 成功: {result}")
return results
5. 在大规模并发场景下的性能优化
- 限制并发数:过多的并发请求可能导致资源耗尽(如文件描述符用尽、内存不足等)。可以使用
asyncio.Semaphore
来限制并发数。例如:
import asyncio
import aiohttp
async def fetch(session, url, semaphore):
async with semaphore:
try:
async with session.get(url) as response:
return await response.text()
except aiohttp.ClientError as e:
print(f"请求 {url} 时发生错误: {e}")
async def main(urls):
semaphore = asyncio.Semaphore(100) # 限制并发数为100
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
urls = ["http://example.com"] * 1000
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(urls))
print(results)
- 连接池复用:
aiohttp.ClientSession
内部已经实现了连接池。通过复用连接,可以减少建立新连接的开销。避免频繁创建和销毁会话对象,尽量在整个应用生命周期内复用同一个会话对象。
- 合理设置超时:为每个请求设置合理的超时时间,避免长时间等待无响应的请求占用资源。在
aiohttp
中,可以在get
、post
等请求方法中设置timeout
参数。例如:async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
,这里设置总超时时间为10秒。