面试题答案
一键面试import asyncio
class AsyncResourceManager:
def __init__(self):
self.resource = None
async def __aenter__(self):
# 异步初始化操作
self.resource = await self._acquire_resource()
return self.resource
async def __aexit__(self, exc_type, exc, tb):
# 处理异常
if exc_type:
print(f"处理异常: {exc_type}, {exc}")
# 异步清理操作
await self._release_resource(self.resource)
async def _acquire_resource(self):
# 模拟异步获取资源
await asyncio.sleep(1)
return "异步资源"
async def _release_resource(self, resource):
# 模拟异步释放资源
await asyncio.sleep(1)
print(f"释放资源: {resource}")
async def main():
async with AsyncResourceManager() as resource:
print(f"使用资源: {resource}")
# 模拟异步操作
await asyncio.sleep(2)
if __name__ == "__main__":
asyncio.run(main())
上述代码定义了一个异步上下文管理器 AsyncResourceManager
,在 __aenter__
方法中进行异步资源的获取,__aexit__
方法中处理异常并进行资源的释放。在 main
函数中,使用 async with
语句来管理异步资源的生命周期。