互斥锁(Mutex)
- 原理:互斥锁是一种二元信号量,它只有两种状态:锁定和解锁。当一个线程获取到互斥锁(将其状态设为锁定),其他线程就无法再获取,直到该线程释放互斥锁(将其状态设为解锁)。这样就保证了同一时间只有一个线程能够访问共享资源,从而避免资源竞争。
- 适用场景:适用于每次只允许一个线程访问共享资源的场景,例如对共享文件的读写操作。在一个多线程的文件管理系统中,当一个线程要对某个文件进行写入操作时,它首先获取互斥锁,其他线程此时就不能再获取锁来进行文件写入,保证了文件数据的一致性。
- 示例代码(Python):
import threading
# 创建互斥锁
mutex = threading.Lock()
shared_resource = 0
def increment():
global shared_resource
mutex.acquire()
try:
shared_resource += 1
finally:
mutex.release()
threads = []
for _ in range(10):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(shared_resource)
信号量(Semaphore)
- 原理:信号量维护了一个计数器,线程获取信号量时,计数器减1,当计数器为0时,其他线程无法获取信号量,直到有线程释放信号量(计数器加1)。它允许多个线程同时访问共享资源,但数量有限制。
- 适用场景:适用于有一定数量限制的共享资源访问场景,例如数据库连接池。假设连接池最多有10个连接,就可以创建一个初始值为10的信号量。每个线程要获取数据库连接时,先获取信号量,如果信号量计数器大于0,获取成功并减1,就可以使用一个连接;当线程使用完连接后,释放信号量,计数器加1。
- 示例代码(Python):
import threading
# 创建信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
shared_resource_list = []
def access_resource():
semaphore.acquire()
try:
shared_resource_list.append(threading.current_thread().name)
print(f"{threading.current_thread().name} 正在访问资源")
finally:
semaphore.release()
threads = []
for _ in range(5):
t = threading.Thread(target=access_resource)
threads.append(t)
t.start()
for t in threads:
t.join()
print(shared_resource_list)
条件变量(Condition Variable)
- 原理:条件变量通常与互斥锁一起使用。线程可以在条件变量上等待某个条件满足,当另一个线程改变了相关条件后,通过条件变量通知等待的线程,等待的线程被唤醒后重新检查条件是否满足,决定是否继续执行。
- 适用场景:适用于线程需要根据某个条件的变化来决定是否执行的场景,例如生产者 - 消费者模型。生产者线程生产数据放入共享队列,消费者线程从队列中取出数据。当队列空时,消费者线程在条件变量上等待,生产者线程生产数据后,通过条件变量通知消费者线程队列中有数据了,消费者线程被唤醒后检查队列,然后取出数据。
- 示例代码(Python):
import threading
# 创建互斥锁和条件变量
mutex = threading.Lock()
condition = threading.Condition(mutex)
shared_queue = []
def producer():
global shared_queue
for i in range(5):
with mutex:
shared_queue.append(i)
print(f"生产者生产了: {i}")
condition.notify()
def consumer():
global shared_queue
while True:
with mutex:
while not shared_queue:
condition.wait()
item = shared_queue.pop(0)
print(f"消费者消费了: {item}")
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()