面试题答案
一键面试并发控制面临的挑战
- 网络延迟:网络延迟可能导致消息传输缓慢,使得不同节点上的Goroutine之间的同步操作出现较大延迟,影响协同工作效率。例如,在获取分布式锁时,由于网络延迟,可能长时间等待锁的响应,造成资源浪费。
- 节点故障:某个节点发生故障可能导致其持有的锁无法正常释放,或者正在处理的任务中断。如果没有合适的机制,可能会造成死锁或数据不一致问题。例如,一个节点在持有锁的过程中突然崩溃,其他节点可能永远无法获取该锁。
- 时钟差异:不同节点的系统时钟可能存在差异,这在一些依赖时间戳的并发控制机制(如分布式锁的过期时间判断)中可能导致不一致问题。例如,一个节点认为锁已经过期可以获取,而另一个节点认为锁仍然有效。
解决方法 - 分布式锁
- 基于Redis的分布式锁:
- 原理:利用Redis的单线程特性和原子操作来实现分布式锁。通过SETNX(SET if Not eXists)命令尝试设置一个键值对,如果设置成功则表示获取到锁,否则获取失败。同时可以设置锁的过期时间来防止死锁。
- 示例代码(Go语言):
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
var ctx = context.Background()
func acquireLock(client *redis.Client, lockKey string, value string, expiration time.Duration) bool {
set, err := client.SetNX(ctx, lockKey, value, expiration).Result()
if err != nil {
fmt.Println("Error setting lock:", err)
return false
}
return set
}
func releaseLock(client *redis.Client, lockKey string, value string) {
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
_, err := client.Eval(ctx, script, []string{lockKey}, value).Int64()
if err != nil {
fmt.Println("Error releasing lock:", err)
}
}
func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
lockKey := "my_distributed_lock"
value := "unique_value"
expiration := 10 * time.Second
if acquireLock(client, lockKey, value, expiration) {
defer releaseLock(client, lockKey, value)
// 这里是获取到锁后执行的业务逻辑
fmt.Println("Acquired lock, doing business logic...")
} else {
fmt.Println("Failed to acquire lock.")
}
}
- 基于ZooKeeper的分布式锁:
- 原理:利用ZooKeeper的临时有序节点特性。每个节点尝试创建一个临时有序节点,最小序号的节点获取到锁。当持有锁的节点故障时,其临时节点会自动删除,其他节点可以重新竞争锁。
- 示例代码(Go语言,使用go-zookeeper/zk库):
package main
import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"time"
)
func acquireLock(conn *zk.Conn, lockPath string) (bool, string, error) {
// 创建临时有序节点
createdPath, err := conn.CreateProtectedEphemeralSequential(lockPath+"/lock-", []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return false, "", err
}
// 获取所有子节点
children, _, err := conn.Children(lockPath)
if err != nil {
return false, "", err
}
// 找到最小序号的节点
minPath := lockPath + "/" + children[0]
if createdPath == minPath {
return true, createdPath, nil
}
// 监听前一个节点的删除事件
watchCh := make(chan zk.Event, 1)
previousPath := lockPath + "/" + children[0]
for _, child := range children {
if lockPath+"/"+child == createdPath {
break
}
previousPath = lockPath + "/" + child
}
_, _, watch, err := conn.GetW(previousPath)
if err != nil {
return false, "", err
}
// 等待前一个节点删除
for {
select {
case event := <-watchCh:
if event.Type == zk.EventNodeDeleted && event.Path == previousPath {
return true, createdPath, nil
}
case <-time.After(5 * time.Second):
return false, "", fmt.Errorf("timed out waiting for lock")
}
}
}
func releaseLock(conn *zk.Conn, lockPath string) error {
return conn.Delete(lockPath, -1)
}
func main() {
servers := []string{"localhost:2181"}
conn, _, err := zk.Connect(servers, 5*time.Second)
if err != nil {
fmt.Println("Error connecting to ZooKeeper:", err)
return
}
defer conn.Close()
lockPath := "/my_distributed_lock"
acquired, lockNode, err := acquireLock(conn, lockPath)
if err != nil {
fmt.Println("Error acquiring lock:", err)
return
}
if acquired {
defer releaseLock(conn, lockNode)
fmt.Println("Acquired lock, doing business logic...")
} else {
fmt.Println("Failed to acquire lock.")
}
}
解决方法 - 分布式队列
- 基于Kafka的分布式队列:
- 原理:Kafka是一个高吞吐量的分布式消息队列。不同节点上的Goroutine可以将任务消息发送到Kafka的主题(Topic)中,每个节点可以从主题的分区中消费消息,按照顺序处理任务,实现协同工作。
- 示例代码(Go语言,使用confluent-kafka-go库): 生产者代码:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
return
}
defer p.Close()
topic := "my_topic"
for i := 0; i < 10; i++ {
message := fmt.Sprintf("Task %d", i)
err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(message),
}, nil)
if err != nil {
fmt.Printf("Failed to produce message: %s\n", err)
}
}
p.Flush(15 * 1000)
}
消费者代码:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my_group",
"auto.offset.reset": "earliest",
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\n", err)
return
}
defer c.Close()
topic := "my_topic"
err = c.SubscribeTopics([]string{topic}, nil)
if err != nil {
fmt.Printf("Failed to subscribe to topic: %s\n", err)
return
}
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received message: %s\n", string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}
- 基于RabbitMQ的分布式队列:
- 原理:RabbitMQ是一个消息代理,支持多种消息传递模式。可以使用简单队列模式,不同节点的Goroutine将任务发送到队列,消费者从队列中获取任务并处理。
- 示例代码(Go语言,使用streadway/amqp库): 生产者代码:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"my_queue",
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
for i := 0; i < 10; i++ {
message := fmt.Sprintf("Task %d", i)
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
failOnError(err, "Failed to publish a message")
}
fmt.Println("Messages sent to the queue.")
}
消费者代码:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"my_queue",
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
fmt.Printf("Received a message: %s\n", d.Body)
}
}()
fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}