ElasticSearch检测和处理版本冲突机制
- 版本控制方式
- ElasticSearch使用乐观并发控制,每个文档都有一个版本号。当文档被创建时,版本号初始化为1,每次对文档进行更新操作,版本号都会递增。
- 检测版本冲突
- 当客户端发起更新请求时,会携带期望的版本号。ElasticSearch在执行更新操作前,会对比文档当前的版本号和客户端传递的版本号。如果当前版本号与客户端期望的版本号不一致,就会检测到版本冲突。例如,假设客户端期望更新版本号为3的文档,但此时文档的实际版本号已经是4(因为在客户端获取版本号3后,其他操作已经对文档进行了更新),就会产生版本冲突。
- 处理版本冲突
- 默认情况下,当检测到版本冲突时,ElasticSearch会返回一个HTTP 409 Conflict错误响应,告诉客户端版本冲突,更新操作未成功执行。
开发者在代码层面避免或解决版本冲突的策略
- 重试策略
- 客户端捕获到版本冲突异常(例如HTTP 409错误)后,可以选择重新获取最新版本的文档,基于最新版本进行修改,然后再次尝试更新操作。例如在Java中,使用Elasticsearch Java High - Level REST Client时,可以如下实现:
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
public class ElasticsearchVersionConflictRetry {
private static final int MAX_RETRIES = 3;
private final RestHighLevelClient client;
public ElasticsearchVersionConflictRetry(RestHighLevelClient client) {
this.client = client;
}
public void updateDocument(String index, String id, String json) throws Exception {
int retryCount = 0;
while (true) {
try {
UpdateRequest updateRequest = new UpdateRequest(index, id)
.doc(json, XContentType.JSON);
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
if (response.getResult() == DocWriteResponse.Result.CREATED ||
response.getResult() == DocWriteResponse.Result.UPDATED) {
break;
}
} catch (Exception e) {
if (retryCount >= MAX_RETRIES) {
throw e;
}
if (e instanceof ElasticsearchException &&
((ElasticsearchException) e).status() == RestStatus.CONFLICT) {
retryCount++;
Thread.sleep(100);
} else {
throw e;
}
}
}
}
}
- 使用
if_seq_no
和if_primary_term
- Elasticsearch 5.0引入了序列号(
_seq_no
)和主分片任期(_primary_term
)。客户端在获取文档时,可以同时获取到这两个值。在更新文档时,将if_seq_no
和if_primary_term
作为参数传递给更新请求。Elasticsearch会对比请求中的这两个值和文档当前的_seq_no
和_primary_term
。只有当它们都匹配时,才会执行更新操作,从而避免版本冲突。例如在使用Elasticsearch Python客户端时:
from elasticsearch import Elasticsearch
es = Elasticsearch()
# 获取文档
doc = es.get(index='your_index', id='your_id')
seq_no = doc['_seq_no']
primary_term = doc['_primary_term']
# 更新文档
update_body = {
"doc": {
"field_to_update": "new_value"
},
"if_seq_no": seq_no,
"if_primary_term": primary_term
}
es.update(index='your_index', id='your_id', body=update_body)
- 批量操作
- 如果有多个更新操作,可以使用批量操作(
bulk
API)。Elasticsearch会按照请求顺序依次执行这些操作,在一定程度上减少版本冲突的可能性。因为批量操作在一个原子请求中处理多个更新,避免了其他请求在中间插入导致版本不一致。例如在使用Elasticsearch Java High - Level REST Client进行批量更新时:
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
public class ElasticsearchBulkUpdate {
private final RestHighLevelClient client;
public ElasticsearchBulkUpdate(RestHighLevelClient client) {
this.client = client;
}
public void bulkUpdate(String index, String[] ids, String[] jsonDocs) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < ids.length; i++) {
UpdateRequest updateRequest = new UpdateRequest(index, ids[i])
.doc(jsonDocs[i], XContentType.JSON);
bulkRequest.add(updateRequest);
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
// 处理失败情况
}
}
}