设计思路
节点间通信
- 使用消息队列:例如RabbitMQ、Kafka等。各个节点连接到同一个消息队列,任务被发送到队列中,节点从队列中获取任务。这种方式解耦了任务的生产和消费,提高系统的扩展性和稳定性。
- 心跳机制:每个节点定期向中心协调者(或通过广播)发送心跳消息,告知自己的存活状态。可以使用简单的HTTP请求或者基于UDP的轻量级协议。
任务分配策略
- 轮询策略:按照节点注册的顺序,依次将任务分配给各个节点。这种策略简单易实现,但可能导致节点负载不均衡。
- 基于负载的分配:每个节点实时向中心协调者汇报自己的当前负载(如CPU使用率、内存使用率、当前处理的任务数等)。协调者根据这些信息,将任务分配给负载最轻的节点。
容错机制
- 任务重试:如果某个节点在处理任务时失败,任务可以被重新放回消息队列,等待其他节点重新处理。可以设置重试次数和重试间隔,避免无限重试。
- 节点故障检测与替换:通过心跳机制,如果中心协调者在一定时间内没有收到某个节点的心跳消息,则判定该节点故障。将该节点从可用节点列表中移除,并重新分配该节点上未完成的任务到其他可用节点。
核心功能伪代码
任务生产者
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue",
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
task := "example task"
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(task),
},
)
if err != nil {
panic(err)
}
fmt.Println("Task sent to queue")
}
任务消费者(节点端)
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue",
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
msgs, err := ch.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
go func() {
for d := range msgs {
fmt.Printf("Received a task: %s\n", d.Body)
// 模拟任务处理
err := processTask(string(d.Body))
if err != nil {
// 处理失败,放回队列
err = ch.Nack(d.DeliveryTag, false, true)
if err != nil {
fmt.Printf("Failed to nack task: %v", err)
}
} else {
// 处理成功,确认任务
err = ch.Ack(d.DeliveryTag, false)
if err != nil {
fmt.Printf("Failed to ack task: %v", err)
}
}
}
}()
fmt.Println("Waiting for tasks...")
select {}
}
func processTask(task string) error {
// 实际任务处理逻辑
fmt.Printf("Processing task: %s\n", task)
return nil
}
中心协调者(简单示例,基于心跳检测)
package main
import (
"fmt"
"time"
)
type Node struct {
ID string
LastHeart time.Time
}
var nodes = make(map[string]*Node)
func monitorNodes() {
for {
for id, node := range nodes {
if time.Since(node.LastHeart) > 10*time.Second {
// 节点故障处理
delete(nodes, id)
fmt.Printf("Node %s is down, removing from the list\n", id)
}
}
time.Sleep(5 * time.Second)
}
}
func main() {
go monitorNodes()
// 模拟节点注册与心跳
go func() {
nodeID := "node1"
nodes[nodeID] = &Node{ID: nodeID, LastHeart: time.Now()}
for {
nodes[nodeID].LastHeart = time.Now()
fmt.Printf("Node %s sent heartbeat\n", nodeID)
time.Sleep(3 * time.Second)
}
}()
select {}
}