面试题答案
一键面试1. 任务入队、出队机制
- 入队机制:使用Redis的
rpush
命令将任务添加到队列的尾部。任务可以是任何可序列化的数据结构,如JSON字符串。 - 出队机制:使用Redis的
lpop
命令从队列的头部取出任务。如果队列为空,lpop
会返回None
。
2. 处理任务的优先级
可以使用Redis的有序集合(Sorted Set)来实现任务优先级。给每个任务分配一个优先级分数,分数越低优先级越高。
- 入队时处理优先级:使用
zadd
命令将任务添加到有序集合中,同时指定任务的优先级分数。 - 出队时处理优先级:使用
zrangebyscore
命令按照优先级分数从小到大获取任务,然后使用zrem
命令从有序集合中移除已获取的任务。
3. 确保任务的原子性和一致性
- 原子性:利用Redis的单线程特性,Redis的命令本身就是原子操作。在Python中,使用
redis - py
库执行Redis命令时,每个命令都是原子的。 - 一致性:在多线程环境下,可以使用
threading.Lock
来确保对Redis操作的一致性。在执行关键的Redis操作(如入队、出队、处理优先级相关操作)时,获取锁,操作完成后释放锁。
4. 可能出现的问题及解决方案
- 队列为空时的阻塞:
lpop
在队列为空时会立即返回None
,可以使用brpop
(阻塞右弹出)命令来实现阻塞式出队,当队列为空时,线程会阻塞等待,直到有新任务入队。 - 多线程竞争:虽然Redis命令是原子的,但多线程同时操作Redis可能导致数据竞争。使用
threading.Lock
可以解决这个问题。 - 任务重复处理:可以为每个任务生成一个唯一标识,在处理任务前先检查任务是否已经被处理过。可以使用Redis的
SETNX
(Set if Not eXists)命令来实现这一点。
5. Python代码实现
import threading
import redis
import json
class DistributedTaskQueue:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.StrictRedis(host=host, port=port, db=db)
self.lock = threading.Lock()
self.queue_key = 'task_queue'
self.priority_queue_key = 'priority_task_queue'
def enqueue(self, task, priority=0):
with self.lock:
if priority is None:
self.redis_client.rpush(self.queue_key, json.dumps(task))
else:
self.redis_client.zadd(self.priority_queue_key, {json.dumps(task): priority})
def dequeue(self):
with self.lock:
task = self.redis_client.lpop(self.queue_key)
if task is None:
tasks = self.redis_client.zrangebyscore(self.priority_queue_key, 0, +inf, start=0, num=1)
if tasks:
task = tasks[0]
self.redis_client.zrem(self.priority_queue_key, task)
if task:
return json.loads(task)
return None
def worker(task_queue):
while True:
task = task_queue.dequeue()
if task:
print(f"Processing task: {task}")
# 模拟任务处理
import time
time.sleep(1)
else:
import time
time.sleep(1)
if __name__ == "__main__":
task_queue = DistributedTaskQueue()
# 启动多个工作线程
num_workers = 3
for _ in range(num_workers):
t = threading.Thread(target=worker, args=(task_queue,))
t.start()
# 添加任务
tasks = [{"id": 1, "name": "task1"}, {"id": 2, "name": "task2", "priority": 1}]
for task in tasks:
priority = task.get('priority')
task_queue.enqueue(task, priority)
上述代码实现了一个分布式任务队列,支持任务的入队、出队以及优先级处理,并且通过锁机制确保了多线程环境下操作的原子性和一致性。