面试题答案
一键面试__enter__
和 __exit__
方法的底层实现机制
__enter__
方法:当使用with
语句进入上下文时,会调用对象的__enter__
方法。这个方法的返回值会绑定到with
语句中的目标变量(如果有)。例如:
class MyContext:
def __enter__(self):
print('Entering context')
return self
with MyContext() as ctx:
pass
在这个例子中,MyContext
类的 __enter__
方法返回 self
,并将其赋值给 ctx
。
__exit__
方法:当with
语句块结束时,无论是否发生异常,都会调用对象的__exit__
方法。__exit__
方法接受三个参数:exc_type
(异常类型,如果没有异常则为None
)、exc_value
(异常实例,如果没有异常则为None
)和traceback
(异常的追溯信息,如果没有异常则为None
)。例如:
class MyContext:
def __enter__(self):
print('Entering context')
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
print(f'Exception occurred: {exc_type}, {exc_value}')
else:
print('Exiting context without exception')
与异常处理之间的关联
如果 with
语句块中发生异常,__exit__
方法会接收到异常的相关信息。__exit__
方法可以选择处理异常(返回 True
),这样异常就不会继续向上传播;如果不处理异常(返回 False
或 None
),异常会继续传播。例如:
class MyContext:
def __enter__(self):
print('Entering context')
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
print(f'Exception handled in __exit__: {exc_type}, {exc_value}')
return True # 处理异常,阻止异常传播
with MyContext() as ctx:
raise ValueError('Test exception')
高并发场景下的优化方案
在高并发场景下,可以使用锁机制(对于多线程)或信号量(对于异步编程)来避免资源竞争。
多线程示例(使用 threading.Lock
)
import threading
class Resource:
def __init__(self):
self.lock = threading.Lock()
def __enter__(self):
self.lock.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.lock.release()
def worker():
with Resource() as res:
print(f'{threading.current_thread().name} acquired resource')
# 模拟资源操作
import time
time.sleep(1)
print(f'{threading.current_thread().name} released resource')
threads = []
for _ in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
异步示例(使用 asyncio.Semaphore
)
import asyncio
class AsyncResource:
def __init__(self):
self.semaphore = asyncio.Semaphore(1)
async def __aenter__(self):
await self.semaphore.acquire()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self.semaphore.release()
async def async_worker():
async with AsyncResource() as res:
print(f'{asyncio.current_task().get_name()} acquired resource')
await asyncio.sleep(1)
print(f'{asyncio.current_task().get_name()} released resource')
async def main():
tasks = [async_worker() for _ in range(5)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
在这两个示例中,通过锁或信号量机制,确保同一时间只有一个线程或协程能够访问资源,从而避免资源竞争导致的异常,同时保证了资源的正确管理和系统的并发性能。