实现步骤
- 定义gRPC服务:
- 在
.proto
文件中定义库存服务的接口,比如定义CheckStock
方法,接收订单所需商品信息,返回库存是否充足的结果。示例如下:
syntax = "proto3";
package inventory;
service InventoryService {
rpc CheckStock(StockRequest) returns (StockResponse);
}
message StockRequest {
repeated string product_ids = 1;
repeated int32 quantities = 2;
}
message StockResponse {
bool is_sufficient = 1;
}
- 实现gRPC服务端(库存服务):
- 实现
.proto
文件定义的接口,在CheckStock
方法中查询实际库存并返回结果。例如在Go语言中:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"log"
"net"
pb "path/to/your/proto"
)
type inventoryServer struct {
pb.UnimplementedInventoryServiceServer
}
func (s *inventoryServer) CheckStock(ctx context.Context, req *pb.StockRequest) (*pb.StockResponse, error) {
// 这里进行实际库存查询逻辑
isSufficient := true
for i := 0; i < len(req.ProductIds); i++ {
// 假设这里有查询库存的函数getStockQuantity
stock := getStockQuantity(req.ProductIds[i])
if stock < int(req.Quantities[i]) {
isSufficient = false
break
}
}
return &pb.StockResponse{IsSufficient: isSufficient}, nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterInventoryServiceServer(s, &inventoryServer{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
- 实现gRPC客户端(订单服务):
- 建立与库存服务的连接,调用
CheckStock
方法。同样以Go语言为例:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "path/to/your/proto"
)
func main() {
conn, err := grpc.Dial(":50051", grpc.WithInsecure())
if err != nil {
fmt.Printf("did not connect: %v", err)
return
}
defer conn.Close()
c := pb.NewInventoryServiceClient(conn)
productIds := []string{"product1", "product2"}
quantities := []int32{10, 5}
req := &pb.StockRequest{ProductIds: productIds, Quantities: quantities}
resp, err := c.CheckStock(context.Background(), req)
if err != nil {
fmt.Printf("could not check stock: %v", err)
return
}
if!resp.IsSufficient {
// 库存不足,发送补货通知到消息队列
sendRestockNotification(productIds, quantities)
}
}
- 消息队列交互:
- 发送端(订单服务):选择合适的消息队列,如RabbitMQ、Kafka等。以RabbitMQ为例,在Go语言中发送补货通知:
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func sendRestockNotification(productIds []string, quantities []int32) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"restock_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
var message string
for i := 0; i < len(productIds); i++ {
message += fmt.Sprintf("Product: %s, Quantity: %d ", productIds[i], quantities[i])
}
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
fmt.Printf(" [x] Sent %s\n", message)
}
- **接收端**:在负责处理补货逻辑的服务中从消息队列接收通知并处理。同样以RabbitMQ和Go语言为例:
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"restock_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 这里进行实际的补货逻辑处理
}
}()
fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
技术难点及解决方案
- gRPC连接问题:
- 难点:网络波动、服务端不可用等可能导致gRPC连接失败。
- 解决方案:实现连接重试机制,如在gRPC客户端使用
retry
库,设置合理的重试次数和时间间隔。同时可以使用服务发现机制,如Consul、Eureka等,让客户端能够动态获取服务端地址,当服务端发生变化时能够及时更新连接。
- 消息队列可靠性:
- 难点:消息可能丢失、重复消费等。
- 解决方案:对于消息丢失问题,在发送端开启消息确认机制(如RabbitMQ的
confirm
模式),确保消息成功发送到队列。在接收端采用手动确认消息的方式,处理完业务逻辑后再确认消息,防止消息处理一半丢失。对于重复消费问题,可以在消息中添加唯一标识,消费端在处理消息前先检查该标识是否已处理过,避免重复处理。
- 分布式事务一致性:
- 难点:订单服务调用库存服务和发送补货通知可能涉及分布式事务,要保证数据一致性。
- 解决方案:采用最终一致性方案,如使用TCC(Try - Confirm - Cancel)模式、Saga模式等。以Saga模式为例,将整个业务流程分解为多个本地事务,通过事件驱动的方式依次执行,如果某个本地事务失败,则按照相反顺序执行补偿事务,以达到最终数据一致性。