高并发写入下 ElasticSearch 负载均衡类 decider 面临的性能挑战
- 网络延迟:高并发写入时,大量请求同时到达,网络拥塞可能导致数据传输延迟,decider 无法及时获取各节点状态,影响负载均衡决策的及时性与准确性。
- 节点资源压力:众多写入请求使各节点 CPU、内存和磁盘 I/O 等资源紧张,decider 需要快速评估节点资源状况,但资源竞争可能使评估结果不准确,导致错误的负载分配。
- 数据一致性维护:高并发写入可能引发数据版本冲突,decider 需在保证负载均衡的同时,确保数据一致性,这增加了决策复杂度与性能压力。
- 集群状态频繁更新:大量写入操作会频繁触发集群状态变化,decider 要实时处理这些变化,以调整负载均衡策略,处理速度跟不上变化频率会导致策略滞后。
优化负载均衡策略保障数据一致性与集群稳定性的机制
- 基于权重的负载均衡:根据节点的硬件资源(如 CPU 核心数、内存大小等)、网络带宽等因素为每个节点分配权重。权重较高的节点接收更多的写入请求,源码中在
NodeAllocationDecider
相关类里,通过计算节点权重,在决策写入节点时优先考虑权重高的节点。例如在 WeightedRoundRobinAllocator
类中,会根据权重循环选择节点,保障资源利用更合理。
- 副本分配策略:为保障数据一致性和集群高可用性,Elasticsearch 采用副本机制。在写入数据时,decider 会考虑副本的分布情况,避免副本集中在少数节点。在
AllocationService
类的 decide
方法中,会检查副本是否均匀分布在不同节点上,若不符合要求则重新分配,确保数据在集群故障时的可恢复性和一致性。
- 动态感知集群状态:通过集群状态更新机制,decider 实时获取集群各节点状态信息,如节点是否健康、负载情况等。
ClusterStateListener
监听集群状态变化事件,当状态改变时,AllocationService
会重新计算负载均衡策略。在 ClusterStateUpdateTask
类中,会根据新的集群状态信息调整写入请求的分配。
- 写入队列与限流:为防止高并发写入瞬间压垮节点,Elasticsearch 为每个节点设置写入队列,并采用限流机制。当节点负载过高时,写入请求会进入队列等待处理,超出限流阈值的请求会被拒绝。在
CircuitBreakerService
类中实现了限流逻辑,通过设置不同类型的断路器(如 fielddata
、request
等),监控资源使用情况,避免资源耗尽,保障集群稳定性。
关键实现逻辑源码分析
NodeAllocationDecider
类:它是负载均衡决策的核心类之一。在 canAllocate
方法中,会检查节点是否满足分配条件,如节点是否可用、磁盘空间是否充足等。该方法综合考虑节点权重、副本分布等因素,决定是否将新的写入请求分配到该节点。
public class NodeAllocationDecider {
public Decision canAllocate(ShardRouting shardRouting, ClusterState state, Node node) {
// 检查节点是否可用
if (!node.isNodeRole(NodeRole.DATA_ROLE)) {
return Decision.NO;
}
// 检查磁盘空间
long diskFree = state.getMetaData().customs().getOrDefault(NODE_DISK_FREE_KEY, 0L);
if (diskFree < MIN_DISK_FREE) {
return Decision.NO;
}
// 考虑副本分布
if (!isReplicaDistributionSatisfied(shardRouting, state)) {
return Decision.NO;
}
// 根据权重等因素决策
int weight = calculateWeight(node);
if (weight < MIN_ALLOCATION_WEIGHT) {
return Decision.NO;
}
return Decision.YES;
}
}
WeightedRoundRobinAllocator
类:实现基于权重的负载均衡算法。nextNode
方法通过维护一个权重数组,循环选择权重最高的节点。
public class WeightedRoundRobinAllocator {
private final Map<Node, Integer> weights;
private int currentIndex = 0;
public WeightedRoundRobinAllocator(Map<Node, Integer> weights) {
this.weights = weights;
}
public Node nextNode() {
Node[] nodes = weights.keySet().toArray(new Node[0]);
int maxWeight = 0;
Node selectedNode = null;
for (int i = 0; i < nodes.length; i++) {
int index = (currentIndex + i) % nodes.length;
Node node = nodes[index];
int weight = weights.get(node);
if (weight > maxWeight) {
maxWeight = weight;
selectedNode = node;
}
}
currentIndex = (currentIndex + 1) % nodes.length;
return selectedNode;
}
}
AllocationService
类:管理集群的分配决策,在 decide
方法中,结合集群状态、节点状态以及各种分配策略(如副本分配策略),最终决定写入请求的分配。
public class AllocationService {
public Decision decide(ShardRouting shardRouting, ClusterState state) {
// 检查集群状态是否健康
if (state.getNodes().isYellow() && shardRouting.primary() && state.getMetaData().numberOfDataNodes() < 2) {
return Decision.NO;
}
// 检查节点分配决策
for (NodeAllocationDecider decider : deciders) {
Decision decision = decider.canAllocate(shardRouting, state, null);
if (decision.type() == Decision.Type.NO) {
return decision;
}
}
return Decision.YES;
}
}
ClusterStateListener
接口与 ClusterStateUpdateTask
类:ClusterStateListener
接口用于监听集群状态变化事件,ClusterStateUpdateTask
类实现了对集群状态变化的处理逻辑。在 run
方法中,当集群状态发生变化时,会重新计算负载均衡策略,更新写入请求的分配。
public class ClusterStateUpdateTask implements Runnable {
private final ClusterState state;
private final AllocationService allocationService;
public ClusterStateUpdateTask(ClusterState state, AllocationService allocationService) {
this.state = state;
this.allocationService = allocationService;
}
@Override
public void run() {
// 根据新的集群状态重新计算负载均衡策略
allocationService.updateAllocation(state);
}
}
CircuitBreakerService
类:实现限流逻辑,通过 tripped
方法判断是否超出限流阈值。
public class CircuitBreakerService {
private final Map<String, CircuitBreaker> circuitBreakers;
public CircuitBreakerService() {
this.circuitBreakers = new HashMap<>();
// 初始化不同类型的断路器
circuitBreakers.put("fielddata", new FielddataCircuitBreaker());
circuitBreakers.put("request", new RequestCircuitBreaker());
}
public boolean tripped(String type, long size) {
CircuitBreaker circuitBreaker = circuitBreakers.get(type);
if (circuitBreaker == null) {
return false;
}
return circuitBreaker.tripped(size);
}
}