require 'thread'
# 创建一个线程安全的队列
queue = Queue.new
mutex = Mutex.new
# 模拟多个数据源,每个数据源是一个独立的线程
source_threads = []
3.times do |i|
source_threads << Thread.new do
loop do
# 模拟数据到达时间不确定
sleep rand(1..3)
data = "This is a sample string #{i + 1}"
mutex.synchronize do
queue << data
end
end
end
end
# 主线程处理队列中的数据
main_thread = Thread.new do
loop do
data = mutex.synchronize { queue.pop }
word_count = data.split.size
puts "Data: #{data}, Word count: #{word_count}"
end
end
# 等待所有线程结束(实际应用中可以根据逻辑来控制结束条件)
source_threads.each(&:join)
main_thread.join