MST

星途 面试题库

面试题:ElasticSearch中Index操作基本流程及高效执行要点

请阐述ElasticSearch Index操作的基本流程,并且说明在高并发场景下,如何通过配置或代码优化来实现Index操作的高效执行。
24.4万 热度难度
数据库ElasticSearch

知识考点

AI 面试

面试题答案

一键面试

ElasticSearch Index操作基本流程

  1. 客户端请求:客户端向ElasticSearch集群发送Index请求,请求中包含要索引的文档数据及相关元数据,如索引名、类型(在ElasticSearch 7.x 后逐渐弃用类型概念)、文档ID(可由用户指定或ElasticSearch自动生成)。
  2. 路由计算:接收到请求的节点(协调节点)根据索引名和文档ID计算出该文档应该存储在哪个分片上。计算公式大致为:shard = hash(routing) % number_of_primary_shards,其中routing默认是文档ID,也可以自定义。通过这种方式确定目标主分片。
  3. 转发请求:协调节点将请求转发到负责目标主分片的节点。
  4. 写入主分片:主分片所在节点接收到请求后,将文档写入到内存中的buffer,同时记录一条操作日志到translog(事务日志)。buffer中的数据会定期(默认1秒)被刷新到segment文件(这一过程称为refresh),此时数据就可以被搜索到了,但segment文件还在内存中。translog则用于保证数据的可靠性,防止数据丢失,它会定期(默认30分钟或translog大小达到一定阈值)被持久化到磁盘。
  5. 复制到副本分片:主分片写入成功后,会将该操作同步到对应的副本分片。副本分片执行同样的写入操作,先写入buffertranslog。只有当所有副本分片都确认写入成功后,整个Index操作才会返回成功响应给客户端。

高并发场景下优化Index操作的方法

配置优化

  1. 调整刷新频率:适当降低refresh_interval(默认1秒),例如设置为30秒或更长时间。这样可以减少频繁刷新segment文件的开销,但会导致数据在写入后较长时间才能被搜索到。可以在索引创建时通过如下设置:
PUT my_index
{
  "settings": {
    "refresh_interval": "30s"
  }
}
  1. 调整合并策略:ElasticSearch通过合并segment文件来优化存储和查询性能。可以调整merge.policy相关参数,例如max_merge_at_once(一次合并的最大segment数,默认10)和max_merge_at_once_explicit(显式触发合并时一次合并的最大segment数,默认30)。对于高写入场景,可以适当增大这些值,但要注意不要过度增大导致内存和I/O压力过大。例如:
PUT my_index
{
  "settings": {
    "index.merge.policy.max_merge_at_once": 20,
    "index.merge.policy.max_merge_at_once_explicit": 40
  }
}
  1. 增加副本数量:在集群资源允许的情况下,适当增加副本分片数量,这样可以分散读请求,提高整体的并发处理能力。但增加副本也会增加写入负担,因为每次写入都需要同步到副本分片,所以需要平衡读写需求。在索引创建时设置副本数量:
PUT my_index
{
  "settings": {
    "number_of_replicas": 2
  }
}
  1. 优化translog设置:适当增大translog.durabilitysync_interval(默认5秒),减少translog的同步频率,降低磁盘I/O开销,但同时也会增加数据丢失的风险。设置如下:
PUT my_index
{
  "settings": {
    "index.translog.durability": "async",
    "index.translog.sync_interval": "10s"
  }
}

代码优化

  1. 批量操作:使用批量(Bulk)API将多个Index请求合并为一个请求发送到ElasticSearch,这样可以减少网络开销。例如在Java中使用Elasticsearch Java High Level REST Client:
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("my_index").id("1").source(XContentType.JSON, "field1", "value1");
IndexRequest indexRequest2 = new IndexRequest("my_index").id("2").source(XContentType.JSON, "field1", "value2");
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http")));
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
  1. 异步操作:利用ElasticSearch客户端提供的异步API,如Java High Level REST Client中的异步方法,将Index操作提交到线程池执行,避免阻塞主线程,提高应用程序的整体响应性能。例如:
IndexRequest indexRequest = new IndexRequest("my_index").id("1").source(XContentType.JSON, "field1", "value1");
client.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        // 处理响应
    }

    @Override
    public void onFailure(Exception e) {
        // 处理异常
    }
});
  1. 数据预处理:在发送Index请求前,对数据进行必要的预处理,如压缩、去重等,减少传输数据量,提高写入效率。例如对文本数据进行压缩:
String originalData = "a very long text...";
byte[] compressedData = CompressorFactory.getDefaultCompressor().compress(originalData.getBytes());
IndexRequest indexRequest = new IndexRequest("my_index").id("1").source("compressed_field", compressedData);