MST

星途 面试题库

面试题:Python asyncio中的任务优先级处理

假设你正在使用asyncio编写一个应用,其中有不同优先级的任务(例如,高优先级的实时数据处理任务和低优先级的日志记录任务)。请描述如何在asyncio中实现任务优先级的调度,给出具体的代码示例和解释。
17.1万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

asyncio中,默认的任务调度是基于FIFO(先进先出)原则,并没有直接支持任务优先级的调度。但是我们可以通过一些方法来模拟任务优先级的调度。一种常见的方法是使用优先级队列。

以下是实现代码示例及解释:

import asyncio
import heapq


class PriorityQueue(asyncio.Queue):
    def __init__(self):
        super().__init__()
        self._queue = []

    def _put(self, item):
        heapq.heappush(self._queue, item)

    def _get(self):
        return heapq.heappop(self._queue)


async def task_function(priority, task_name):
    await asyncio.sleep(1)
    print(f"Priority {priority}: Task {task_name} completed")


async def main():
    pq = PriorityQueue()

    # 添加不同优先级的任务
    await pq.put((1, task_function(1, "High Priority Task")))
    await pq.put((3, task_function(3, "Low Priority Task")))
    await pq.put((2, task_function(2, "Medium Priority Task")))

    tasks = []
    while not pq.empty():
        _, task = await pq.get()
        tasks.append(asyncio.create_task(task))

    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

代码解释:

  1. PriorityQueue类
    • 我们定义了一个PriorityQueue类,继承自asyncio.Queue。这个类重写了_put_get方法,使用heapq模块来实现基于优先级的队列。heapq是Python的堆队列算法模块,它会自动维护一个最小堆,确保每次取出的元素是优先级最高的(这里数字越小优先级越高)。
  2. task_function函数
    • 这是一个简单的异步任务函数,模拟任务的执行,通过asyncio.sleep(1)模拟任务执行需要1秒的时间,然后打印任务完成的信息。
  3. main函数
    • 创建了一个PriorityQueue实例pq
    • 使用pq.put方法向队列中添加不同优先级的任务,任务以元组的形式添加,元组的第一个元素是优先级,第二个元素是实际的任务函数。
    • 通过循环从队列中取出任务,并使用asyncio.create_task创建任务对象,添加到tasks列表中。
    • 最后使用asyncio.gather等待所有任务完成。

通过这种方式,我们可以在asyncio中模拟实现任务优先级的调度。