require 'thread'
# 任务类
class Task
def initialize(name, &block)
@name = name
@block = block
@dependencies = []
end
def add_dependency(task)
@dependencies << task
end
def execute
puts "Starting task #{@name}"
@block.call
puts "Finished task #{@name}"
end
def dependencies_completed?(completed_tasks)
@dependencies.all? { |dependency| completed_tasks.include?(dependency) }
end
end
# 任务调度器类
class TaskScheduler
def initialize
@tasks = []
@completed_tasks = []
@mutex = Mutex.new
@condition = ConditionVariable.new
end
def add_task(task)
@tasks << task
end
def run
threads = []
@tasks.each do |task|
threads << Thread.new do
loop do
@mutex.synchronize do
while!task.dependencies_completed?(@completed_tasks)
@condition.wait(@mutex)
end
task.execute
@completed_tasks << task
@condition.broadcast
end
end
end
end
threads.each(&:join)
end
end
# 示例使用
scheduler = TaskScheduler.new
# 模拟文件下载任务
download_task = Task.new('File Download') do
sleep 2 # 模拟下载时间
end
# 模拟数据处理任务,依赖文件下载任务
process_task = Task.new('Data Process') do
sleep 3 # 模拟处理时间
end
process_task.add_dependency(download_task)
# 模拟结果上传任务,依赖数据处理任务
upload_task = Task.new('Result Upload') do
sleep 1 # 模拟上传时间
end
upload_task.add_dependency(process_task)
scheduler.add_task(download_task)
scheduler.add_task(process_task)
scheduler.add_task(upload_task)
scheduler.run
线程同步与资源竞争处理方式
- 线程同步:
- 使用
Mutex
来保护共享资源(这里主要是 @completed_tasks
数组),确保在多线程环境下对其的读写操作是安全的。
ConditionVariable
用于线程间的通信。当一个任务的依赖未完成时,线程会调用 @condition.wait(@mutex)
进入等待状态,释放 Mutex
。当有任务完成时,会调用 @condition.broadcast
唤醒所有等待的线程,这些线程会重新获取 Mutex
并检查自己的依赖是否已完成。
- 资源竞争:
- 通过
Mutex
对共享资源的访问进行控制,避免多个线程同时修改 @completed_tasks
导致数据不一致。由于只有被唤醒且依赖完成的任务才会执行,所以不会出现对同一资源的无序竞争情况。每个任务执行时是独立的,不会干扰其他任务,除非它们之间存在依赖关系。