package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Producer接口定义生产者行为
type Producer interface {
Produce(chan interface{})
}
// Consumer接口定义消费者行为
type Consumer interface {
Consume(chan interface{})
}
// ProducerNode结构体实现Producer接口
type ProducerNode struct {
ID int
}
func (p ProducerNode) Produce(output chan interface{}) {
for {
// 模拟生产数据
data := fmt.Sprintf("Data from Producer %d", p.ID)
// 模拟网络延迟
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
output <- data
}
}
// ConsumerNode结构体实现Consumer接口
type ConsumerNode struct {
ID int
}
func (c ConsumerNode) Consume(input chan interface{}) {
for data := range input {
// 模拟消费数据
fmt.Printf("Consumer %d consumed: %v\n", c.ID, data)
// 模拟网络延迟
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
}
// NodeManager结构体管理节点
type NodeManager struct {
producers []Producer
consumers []Consumer
nodes map[int]interface{}
mu sync.Mutex
}
// AddProducer方法添加生产者节点
func (nm *NodeManager) AddProducer(p Producer) {
nm.mu.Lock()
nm.producers = append(nm.producers, p)
nm.nodes[len(nm.producers)-1] = p
nm.mu.Unlock()
}
// AddConsumer方法添加消费者节点
func (nm *NodeManager) AddConsumer(c Consumer) {
nm.mu.Lock()
nm.consumers = append(nm.consumers, c)
nm.nodes[len(nm.consumers)-1] = c
nm.mu.Unlock()
}
// RemoveNode方法移除节点
func (nm *NodeManager) RemoveNode(nodeID int) {
nm.mu.Lock()
delete(nm.nodes, nodeID)
newProducers := make([]Producer, 0, len(nm.producers))
newConsumers := make([]Consumer, 0, len(nm.consumers))
for _, p := range nm.producers {
if _, ok := nm.nodes[getID(p)]; ok {
newProducers = append(newProducers, p)
}
}
for _, c := range nm.consumers {
if _, ok := nm.nodes[getID(c)]; ok {
newConsumers = append(newConsumers, c)
}
}
nm.producers = newProducers
nm.consumers = newConsumers
nm.mu.Unlock()
}
func getID(node interface{}) int {
switch v := node.(type) {
case ProducerNode:
return v.ID
case ConsumerNode:
return v.ID
default:
return -1
}
}
func main() {
nm := &NodeManager{
nodes: make(map[int]interface{}),
}
// 添加生产者节点
producer1 := ProducerNode{ID: 1}
producer2 := ProducerNode{ID: 2}
nm.AddProducer(producer1)
nm.AddProducer(producer2)
// 添加消费者节点
consumer1 := ConsumerNode{ID: 1}
consumer2 := ConsumerNode{ID: 2}
nm.AddConsumer(consumer1)
nm.AddConsumer(consumer2)
dataCh := make(chan interface{})
// 启动生产者
for _, p := range nm.producers {
go p.Produce(dataCh)
}
// 启动消费者
for _, c := range nm.consumers {
go c.Consume(dataCh)
}
// 模拟运行一段时间
time.Sleep(5 * time.Second)
// 移除一个生产者节点
nm.RemoveNode(1)
// 继续运行一段时间
time.Sleep(3 * time.Second)
}
- 接口定义:
- 定义了
Producer
和Consumer
接口,分别表示生产者和消费者的行为。
- 节点结构体:
ProducerNode
结构体实现了Producer
接口,通过Produce
方法模拟生产数据,并添加了网络延迟模拟。
ConsumerNode
结构体实现了Consumer
接口,通过Consume
方法模拟消费数据,并添加了网络延迟模拟。
- 节点管理器:
NodeManager
结构体用于管理节点,包含了生产者和消费者的切片以及一个节点映射。
AddProducer
和AddConsumer
方法用于动态添加节点。
RemoveNode
方法用于动态移除节点,并更新生产者和消费者的切片。
- 主函数:
- 在
main
函数中,创建了节点管理器,添加了生产者和消费者节点。
- 启动生产者和消费者的协程,模拟系统运行。
- 运行一段时间后,移除一个生产者节点,继续模拟系统运行,展示了动态移除节点的功能。