1. 概念不同
- 传统多线程编程:
- 锁(Lock):是一种基本的同步原语,用于保证在同一时间只有一个线程能够访问共享资源,防止数据竞争。线程获取锁后才能进入临界区,操作完共享资源后释放锁。
- 信号量(Semaphore):本质也是一种锁机制,但它允许一定数量的线程同时访问共享资源。信号量内部维护一个计数器,每次获取信号量计数器减1,释放信号量计数器加1,当计数器为0时,其他线程无法获取信号量。
- Python异步编程(asyncio库):
- 锁(asyncio.Lock):与多线程中的锁类似,但适用于异步任务。它是一个异步原语,用于确保在同一时间只有一个异步任务可以进入临界区。由于异步任务是非阻塞的,所以asyncio.Lock主要用于控制对异步资源的访问。
- 信号量(asyncio.Semaphore):同样是异步版本,控制同时允许运行的异步任务数量。它维护一个内部计数器,允许一定数量的异步任务同时获取信号量并执行相关操作。
- 分布式系统(基于Celery的架构):
- 锁:在分布式系统中,锁通常基于外部存储(如Redis、Zookeeper等)实现。分布式锁用于协调多个节点上的任务,确保在整个分布式环境中,同一时间只有一个节点能执行特定任务。例如,使用Redis实现的分布式锁,通过设置唯一键值对来表示锁的持有状态。
- 信号量:分布式信号量也依赖外部存储。它用于控制分布式系统中多个节点对共享资源的并发访问。例如,基于Redis的分布式信号量,可以通过对特定键的计数器操作来实现信号量的功能。
2. 使用方式不同
import threading
lock = threading.Lock()
semaphore = threading.Semaphore(3)
def task():
semaphore.acquire()
lock.acquire()
try:
# 访问共享资源的代码
print('Thread is accessing shared resource')
finally:
lock.release()
semaphore.release()
threads = []
for _ in range(5):
t = threading.Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
import asyncio
async def async_task(semaphore):
async with semaphore:
async with asyncio.Lock():
# 异步访问共享资源的代码
print('Async task is accessing shared resource')
async def main():
semaphore = asyncio.Semaphore(3)
tasks = [async_task(semaphore) for _ in range(5)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
- 分布式系统(基于Celery的架构):
假设使用Redis实现分布式锁和信号量
import celery
import redis
app = celery.Celery('tasks', broker='redis://localhost:6379/0')
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
def acquire_distributed_lock(lock_key):
lock_value = redis_client.set(lock_key, 1, nx=True, ex=10)
return lock_value
def release_distributed_lock(lock_key):
redis_client.delete(lock_key)
def acquire_distributed_semaphore(semaphore_key, max_value=3):
while True:
current_value = redis_client.get(semaphore_key)
if current_value is None:
redis_client.set(semaphore_key, max_value)
current_value = max_value
if int(current_value) > 0:
new_value = int(current_value) - 1
redis_client.set(semaphore_key, new_value)
return True
else:
time.sleep(1)
def release_distributed_semaphore(semaphore_key, max_value=3):
current_value = redis_client.get(semaphore_key)
if current_value is None:
redis_client.set(semaphore_key, max_value)
else:
new_value = int(current_value) + 1
if new_value > max_value:
new_value = max_value
redis_client.set(semaphore_key, new_value)
@app.task
def distributed_task():
lock_key = 'distributed_lock'
semaphore_key = 'distributed_semaphore'
if acquire_distributed_lock(lock_key):
if acquire_distributed_semaphore(semaphore_key):
try:
# 分布式任务执行代码
print('Distributed task is executing')
finally:
release_distributed_semaphore(semaphore_key)
release_distributed_lock(lock_key)
3. 在跨多节点的分布式任务队列系统中的运用
- 确保任务正确执行顺序:
- 锁的运用:可以通过在任务执行前获取分布式锁,确保同一时间只有一个节点能执行特定任务。例如,有一个初始化任务需要在所有其他任务之前执行。可以使用分布式锁,在每个节点尝试执行初始化任务前获取锁,获取到锁的节点执行任务,执行完后释放锁,其他节点才能获取锁并执行后续任务。
- 信号量的运用:可以利用信号量控制任务的执行节奏。例如,某些任务依赖于其他任务的输出,通过信号量来控制依赖任务的完成数量,当达到一定数量时,才允许执行后续任务。
- 资源合理分配:
- 锁的运用:对于独占资源(如数据库连接池中的特定连接),使用分布式锁来保证同一时间只有一个节点能获取该资源。节点获取锁后使用资源,使用完释放锁,其他节点可以竞争获取。
- 信号量的运用:假设系统中有有限的计算资源(如GPU资源),可以通过分布式信号量来限制同时使用该资源的任务数量。每个任务在使用资源前获取信号量,使用完后释放,这样可以合理分配资源,避免资源过度使用导致系统崩溃。