面试题答案
一键面试1. 资源分配
- CPU 资源分配:
- 原因:CPU 密集型任务会占用大量 CPU 时间,若处理不当会导致整个应用卡顿,影响网络 I/O 等其他任务执行。
- 实现思路:将 CPU 密集型任务与网络 I/O 任务分离。对于 CPU 密集型任务,可以使用
concurrent.futures.ProcessPoolExecutor
来利用多进程并行处理,因为 Python 的全局解释器锁(GIL)限制了多线程在 CPU 密集型任务上的并行性,而多进程可以绕过 GIL 充分利用多核 CPU。例如:
import concurrent.futures
import asyncio
def cpu_bound_task():
# 模拟 CPU 密集型任务
result = 0
for i in range(100000000):
result += i
return result
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as executor:
result = await loop.run_in_executor(executor, cpu_bound_task)
print(f"CPU 密集型任务结果: {result}")
if __name__ == "__main__":
asyncio.run(main())
- 网络资源分配:
- 原因:大量网络 I/O 操作需要合理的资源配置以避免网络阻塞或资源耗尽。
- 实现思路:使用
aiohttp
等异步网络库,其内部对网络连接池等进行了优化。可以设置连接池的大小,例如在aiohttp
中:
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=10)) as session:
tasks = []
urls = ["http://example.com"] * 20
for url in urls:
task = asyncio.create_task(fetch(session, url))
tasks.append(task)
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
asyncio.run(main())
这里 limit = 10
设置了连接池的最大连接数为 10,防止过多连接耗尽网络资源。
2. 协程调度
- 合理创建与管理协程数量:
- 原因:创建过多协程会消耗大量内存且可能导致调度开销过大,过少则无法充分利用资源。
- 实现思路:根据系统资源(如 CPU 核心数、内存大小等)和任务特性动态调整协程数量。例如,可以使用
asyncio.Semaphore
来限制并发执行的协程数量。假设我们有一个任务函数task
:
import asyncio
async def task(semaphore, num):
async with semaphore:
await asyncio.sleep(1)
print(f"任务 {num} 完成")
async def main():
semaphore = asyncio.Semaphore(5) # 最多允许 5 个协程同时执行
tasks = []
for i in range(10):
task_i = asyncio.create_task(task(semaphore, i))
tasks.append(task_i)
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
- 任务优先级调度:
- 原因:有些任务可能对响应时间要求较高,需要优先执行。
- 实现思路:可以使用优先级队列来管理协程。例如,定义一个简单的优先级队列类:
import asyncio
import heapq
class PriorityQueue:
def __init__(self):
self.pq = []
self.counter = 0
def put(self, item, priority):
entry = (-priority, self.counter, item)
heapq.heappush(self.pq, entry)
self.counter += 1
def get(self):
_, _, item = heapq.heappop(self.pq)
return item
def empty(self):
return len(self.pq) == 0
async def high_priority_task():
await asyncio.sleep(1)
print("高优先级任务完成")
async def low_priority_task():
await asyncio.sleep(2)
print("低优先级任务完成")
async def main():
pq = PriorityQueue()
pq.put(asyncio.create_task(high_priority_task()), 1) # 优先级 1 较高
pq.put(asyncio.create_task(low_priority_task()), 0) # 优先级 0 较低
tasks = []
while not pq.empty():
task = pq.get()
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
3. 内存管理
- 避免内存泄漏:
- 原因:在复杂的异步应用中,尤其是涉及大量对象创建和销毁时,容易出现内存泄漏,导致内存占用不断上升最终耗尽内存。
- 实现思路:
- 及时释放不再使用的对象引用。例如,在处理完网络响应数据后,及时删除对响应对象的引用,Python 的垃圾回收机制会回收相关内存。
- 对于长时间运行的异步任务,定期检查和清理缓存。比如使用
functools.lru_cache
进行缓存时,可以设置缓存的最大大小,避免缓存无限增长:
import functools
@functools.lru_cache(maxsize = 128)
def cached_function(x):
# 模拟计算
return x * x
- 优化数据结构使用:
- 原因:合适的数据结构可以减少内存占用并提高操作效率。
- 实现思路:
- 对于频繁插入和删除操作的场景,使用
collections.deque
替代列表,因为deque
在两端操作的时间复杂度为 O(1),而列表在头部插入删除的时间复杂度为 O(n)。 - 对于存储大量唯一元素且需要快速查找的场景,使用
set
或dict
。例如,如果需要记录已经处理过的网络请求 URL 以避免重复处理,可以使用set
:
- 对于频繁插入和删除操作的场景,使用
processed_urls = set()
url = "http://example.com"
if url not in processed_urls:
processed_urls.add(url)
# 处理 URL