MST

星途 面试题库

面试题:Python异步I/O并发编程的深度剖析

在Python中使用`asyncio`库进行异步I/O编程。假设你要开发一个高并发的网络爬虫,需要从多个网站快速抓取数据。请详细阐述`asyncio`库的事件循环机制、协程概念以及如何利用`asyncio`实现高效的并发请求。同时,说明如何处理异步操作中的异常,以及在大规模并发场景下如何优化性能,避免资源耗尽和性能瓶颈。
49.9万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

1. asyncio库的事件循环机制

  • 事件循环定义:事件循环是asyncio的核心,它是一个无限循环,负责不断地检查和执行已经准备好(就绪)的任务(协程对象包装后的任务)。它会在不同的任务之间进行切换,让每个任务都有机会执行一段时间,模拟出并发执行的效果。
  • 工作原理:事件循环会维护一个任务队列,新创建的任务(如通过asyncio.create_task()创建)会被添加到这个队列中。事件循环会不断地从队列中取出任务,执行任务中的可运行部分(直到遇到await语句,该任务会被挂起,等待await的对象完成),然后将任务放回队列或根据完成情况移除。同时,事件循环还会监听I/O操作的状态变化,当I/O操作完成(例如网络请求返回数据),相关的任务会被重新标记为可运行状态,加入任务队列等待执行。

2. 协程概念

  • 协程定义:在Python中,协程是一种基于生成器的异步编程模型。使用async def关键字定义的函数就是一个协程函数,调用协程函数不会立即执行函数体,而是返回一个协程对象。协程对象可以被await,这会暂停当前协程的执行,直到被await的协程完成。
  • 与生成器的关系:协程基于生成器扩展而来。普通生成器使用yield暂停和恢复执行,而协程使用await暂停执行,并在等待的操作完成后恢复。协程可以暂停执行,让出控制权给事件循环,使得其他协程有机会执行,实现高效的并发。

3. 利用asyncio实现高效的并发请求

import asyncio
import aiohttp


async def fetch(session, url):
    try:
        async with session.get(url) as response:
            return await response.text()
    except aiohttp.ClientError as e:
        print(f"请求 {url} 时发生错误: {e}")


async def main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results


if __name__ == "__main__":
    urls = ["http://example.com", "http://another-example.com"]
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(urls))
    print(results)
  • 创建任务:使用asyncio.create_task()asyncio.gather()创建任务。asyncio.create_task()用于将一个协程包装成任务并立即安排到事件循环中执行;asyncio.gather()接受多个协程对象,将它们包装成任务并并行执行,返回一个包含所有任务结果的Future对象。
  • 使用aiohttp:在异步网络请求中,aiohttp是常用的库。它提供了异步的HTTP客户端和服务器功能,与asyncio无缝集成。通过async with aiohttp.ClientSession()创建一个会话对象,在会话对象上使用getpost等方法发起异步请求。

4. 处理异步操作中的异常

  • 捕获单个协程异常:在单个协程函数内部,可以使用try - except块捕获异常。如上述代码中fetch函数内捕获aiohttp.ClientError异常,这样在请求单个URL出现错误时,可以进行相应处理而不影响其他请求。
  • 捕获多个任务异常:当使用asyncio.gather()运行多个任务时,可以通过设置return_exceptions=True参数。这样如果某个任务抛出异常,asyncio.gather()不会立即引发异常,而是将异常作为结果的一部分返回。例如:
async def main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"任务 {i} 发生异常: {result}")
            else:
                print(f"任务 {i} 成功: {result}")
        return results

5. 在大规模并发场景下的性能优化

  • 限制并发数:过多的并发请求可能导致资源耗尽(如文件描述符用尽、内存不足等)。可以使用asyncio.Semaphore来限制并发数。例如:
import asyncio
import aiohttp


async def fetch(session, url, semaphore):
    async with semaphore:
        try:
            async with session.get(url) as response:
                return await response.text()
        except aiohttp.ClientError as e:
            print(f"请求 {url} 时发生错误: {e}")


async def main(urls):
    semaphore = asyncio.Semaphore(100)  # 限制并发数为100
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
        return results


if __name__ == "__main__":
    urls = ["http://example.com"] * 1000
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(urls))
    print(results)
  • 连接池复用aiohttp.ClientSession内部已经实现了连接池。通过复用连接,可以减少建立新连接的开销。避免频繁创建和销毁会话对象,尽量在整个应用生命周期内复用同一个会话对象。
  • 合理设置超时:为每个请求设置合理的超时时间,避免长时间等待无响应的请求占用资源。在aiohttp中,可以在getpost等请求方法中设置timeout参数。例如:async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:,这里设置总超时时间为10秒。