面试题答案
一键面试import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
content = await response.text()
return response.status, len(content)
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for status, length in results:
print(f"状态码: {status}, 响应内容长度: {length}")
if __name__ == "__main__":
urls = [
'http://example.com',
'http://example.org',
'http://example.net'
]
asyncio.run(fetch_all(urls))
上述代码首先定义了一个fetch
函数,用于发起单个HTTP GET请求并返回响应状态码和响应内容长度。fetch_all
函数则负责创建aiohttp.ClientSession
,并为每个URL创建一个fetch
任务,使用asyncio.gather
等待所有任务完成,最后按顺序输出每个请求的响应状态码和响应内容长度。在if __name__ == "__main__"
块中,定义了一组URL并调用fetch_all
函数来执行异步请求。