MST

星途 面试题库

面试题:Python多线程在Web开发中可能面临哪些资源竞争问题及如何解决

在Python的Web开发中使用多线程,经常会遇到资源竞争的情况。请举例说明可能出现资源竞争的场景,如对数据库连接的访问、对共享内存数据的读写等,并阐述至少两种解决资源竞争问题的方法,比如使用锁机制,具体说明如何在代码中实现这些解决方案。
32.7万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

可能出现资源竞争的场景

  1. 数据库连接访问:多个线程同时尝试获取数据库连接进行读写操作。例如在一个Web应用中,不同线程处理不同用户请求,都需要查询数据库获取用户信息。如果没有适当的控制,可能会出现连接池耗尽、数据不一致等问题。
  2. 共享内存数据读写:比如多个线程都要对一个全局变量(类似共享内存概念)进行读写。假设这个全局变量记录网站的访问次数,每个线程处理一次访问请求时都要对其加1。如果没有同步机制,可能会导致数据不准确。

解决资源竞争问题的方法

  1. 锁机制(Lock)
    • 原理:通过获取锁来保证同一时间只有一个线程可以访问共享资源,访问完成后释放锁,其他线程才能获取锁进行访问。
    • 代码示例
import threading

# 共享资源
counter = 0
lock = threading.Lock()


def increment():
    global counter
    lock.acquire()
    try:
        counter += 1
    finally:
        lock.release()


threads = []
for _ in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final counter value: {counter}")
  1. 信号量(Semaphore)
    • 原理:信号量允许一定数量的线程同时访问共享资源。与锁不同,锁是只允许一个线程访问,而信号量可以允许多个线程同时访问,只要访问线程数不超过设定的数量。
    • 代码示例
import threading

# 创建信号量,允许3个线程同时访问
semaphore = threading.Semaphore(3)


def access_shared_resource():
    semaphore.acquire()
    try:
        # 模拟访问共享资源
        print(f"{threading.current_thread().name} is accessing shared resource")
    finally:
        semaphore.release()


threads = []
for i in range(10):
    t = threading.Thread(target=access_shared_resource)
    threads.append(t)
    t.start()

for t in threads:
    t.join()
  1. 队列(Queue)
    • 原理:将需要处理的任务放入队列,每个线程从队列中获取任务进行处理,避免了对共享资源的直接竞争。
    • 代码示例
import threading
import queue

# 创建队列
task_queue = queue.Queue()


def worker():
    while True:
        try:
            task = task_queue.get(timeout = 1)
            # 处理任务
            print(f"{threading.current_thread().name} is processing task: {task}")
            task_queue.task_done()
        except queue.Empty:
            break


# 放入任务
for i in range(10):
    task_queue.put(f"Task {i}")

# 创建线程
threads = []
for _ in range(3):
    t = threading.Thread(target = worker)
    threads.append(t)
    t.start()

# 等待所有任务完成
task_queue.join()

for t in threads:
    t.join()