实现原理
- 锁(Lock):
- 锁是一种简单的同步原语,它只有两种状态:锁定(locked)和未锁定(unlocked)。
- 当一个线程获取锁时,锁进入锁定状态,其他线程试图获取锁时将被阻塞,直到锁被释放回到未锁定状态。
- 在Python中,
threading.Lock
类实现了锁机制,其内部使用操作系统提供的互斥锁(mutex)来保证同一时间只有一个线程能进入临界区。
- 信号量(Semaphore):
- 信号量内部维护一个计数器,其值表示当前可用的资源数量。
- 当一个线程获取信号量时,计数器减1;当一个线程释放信号量时,计数器加1。
- 如果计数器的值为0,那么试图获取信号量的线程将被阻塞,直到其他线程释放信号量使计数器大于0。Python中的
threading.Semaphore
类基于此原理实现。
功能特性
- 锁(Lock):
- 主要功能是保证在同一时刻只有一个线程可以访问共享资源,防止数据竞争和不一致问题。
- 它是二元的,即要么锁定要么未锁定,适用于需要严格独占访问的场景。
- 一旦一个线程获取了锁,其他线程必须等待锁的释放才能获取。
- 信号量(Semaphore):
- 可以控制同时访问共享资源的线程数量。通过设置信号量的初始值,可以允许一定数量的线程同时进入临界区。
- 相比锁,它更灵活,适用于需要限制并发访问数量的场景,而不是完全独占。
适用场景
- 锁(Lock)的适用场景:
- 场景例子:银行账户转账操作。假设有一个银行账户类,有存款和取款方法,账户余额是共享资源。
- 原因:在进行转账操作时,需要确保对账户余额的读取、修改等操作是原子性的,不能有多个线程同时操作,否则可能导致余额计算错误。例如:
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
self.lock.acquire()
try:
if self.balance >= amount:
self.balance -= amount
print(f"成功取款 {amount},余额为 {self.balance}")
else:
print("余额不足")
finally:
self.lock.release()
account = BankAccount(1000)
t1 = threading.Thread(target=account.withdraw, args=(500,))
t2 = threading.Thread(target=account.withdraw, args=(300,))
t1.start()
t2.start()
t1.join()
t2.join()
- 信号量(Semaphore)的适用场景:
- 场景例子:数据库连接池。假设有一个数据库连接池,里面有固定数量的数据库连接对象,多个线程需要使用数据库连接执行SQL查询。
- 原因:由于数据库连接资源有限,不能让所有线程无限制地获取连接,否则可能导致数据库服务器过载。通过信号量可以限制同时获取连接的线程数量,确保连接资源的合理使用。例如:
import threading
import time
class DatabaseConnectionPool:
def __init__(self, max_connections):
self.max_connections = max_connections
self.semaphore = threading.Semaphore(max_connections)
self.connections = [f"Connection {i}" for i in range(max_connections)]
def get_connection(self):
self.semaphore.acquire()
connection = self.connections.pop(0)
print(f"获取连接: {connection}")
return connection
def release_connection(self, connection):
self.connections.append(connection)
print(f"释放连接: {connection}")
self.semaphore.release()
pool = DatabaseConnectionPool(3)
def query_database():
connection = pool.get_connection()
try:
print(f"使用连接 {connection} 执行查询")
time.sleep(2)
finally:
pool.release_connection(connection)
threads = []
for _ in range(5):
t = threading.Thread(target=query_database)
threads.append(t)
t.start()
for t in threads:
t.join()