ElasticSearch Index操作基本流程
- 客户端请求:客户端向ElasticSearch集群发送Index请求,请求中包含要索引的文档数据及相关元数据,如索引名、类型(在ElasticSearch 7.x 后逐渐弃用类型概念)、文档ID(可由用户指定或ElasticSearch自动生成)。
- 路由计算:接收到请求的节点(协调节点)根据索引名和文档ID计算出该文档应该存储在哪个分片上。计算公式大致为:
shard = hash(routing) % number_of_primary_shards
,其中routing
默认是文档ID,也可以自定义。通过这种方式确定目标主分片。
- 转发请求:协调节点将请求转发到负责目标主分片的节点。
- 写入主分片:主分片所在节点接收到请求后,将文档写入到内存中的
buffer
,同时记录一条操作日志到translog
(事务日志)。buffer
中的数据会定期(默认1秒)被刷新到segment
文件(这一过程称为refresh),此时数据就可以被搜索到了,但segment
文件还在内存中。translog
则用于保证数据的可靠性,防止数据丢失,它会定期(默认30分钟或translog
大小达到一定阈值)被持久化到磁盘。
- 复制到副本分片:主分片写入成功后,会将该操作同步到对应的副本分片。副本分片执行同样的写入操作,先写入
buffer
和translog
。只有当所有副本分片都确认写入成功后,整个Index操作才会返回成功响应给客户端。
高并发场景下优化Index操作的方法
配置优化
- 调整刷新频率:适当降低
refresh_interval
(默认1秒),例如设置为30秒或更长时间。这样可以减少频繁刷新segment
文件的开销,但会导致数据在写入后较长时间才能被搜索到。可以在索引创建时通过如下设置:
PUT my_index
{
"settings": {
"refresh_interval": "30s"
}
}
- 调整合并策略:ElasticSearch通过合并
segment
文件来优化存储和查询性能。可以调整merge.policy
相关参数,例如max_merge_at_once
(一次合并的最大segment
数,默认10)和max_merge_at_once_explicit
(显式触发合并时一次合并的最大segment
数,默认30)。对于高写入场景,可以适当增大这些值,但要注意不要过度增大导致内存和I/O压力过大。例如:
PUT my_index
{
"settings": {
"index.merge.policy.max_merge_at_once": 20,
"index.merge.policy.max_merge_at_once_explicit": 40
}
}
- 增加副本数量:在集群资源允许的情况下,适当增加副本分片数量,这样可以分散读请求,提高整体的并发处理能力。但增加副本也会增加写入负担,因为每次写入都需要同步到副本分片,所以需要平衡读写需求。在索引创建时设置副本数量:
PUT my_index
{
"settings": {
"number_of_replicas": 2
}
}
- 优化
translog
设置:适当增大translog.durability
的sync_interval
(默认5秒),减少translog
的同步频率,降低磁盘I/O开销,但同时也会增加数据丢失的风险。设置如下:
PUT my_index
{
"settings": {
"index.translog.durability": "async",
"index.translog.sync_interval": "10s"
}
}
代码优化
- 批量操作:使用批量(Bulk)API将多个Index请求合并为一个请求发送到ElasticSearch,这样可以减少网络开销。例如在Java中使用Elasticsearch Java High Level REST Client:
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("my_index").id("1").source(XContentType.JSON, "field1", "value1");
IndexRequest indexRequest2 = new IndexRequest("my_index").id("2").source(XContentType.JSON, "field1", "value2");
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")));
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
- 异步操作:利用ElasticSearch客户端提供的异步API,如Java High Level REST Client中的异步方法,将Index操作提交到线程池执行,避免阻塞主线程,提高应用程序的整体响应性能。例如:
IndexRequest indexRequest = new IndexRequest("my_index").id("1").source(XContentType.JSON, "field1", "value1");
client.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// 处理响应
}
@Override
public void onFailure(Exception e) {
// 处理异常
}
});
- 数据预处理:在发送Index请求前,对数据进行必要的预处理,如压缩、去重等,减少传输数据量,提高写入效率。例如对文本数据进行压缩:
String originalData = "a very long text...";
byte[] compressedData = CompressorFactory.getDefaultCompressor().compress(originalData.getBytes());
IndexRequest indexRequest = new IndexRequest("my_index").id("1").source("compressed_field", compressedData);