使用的 API 及基本步骤
- API:使用
_bulk
API 来批量插入文档。
- 基本步骤:
- 构造请求体:
_bulk
API 的请求体由多个子请求组成,每个子请求包含操作类型(如 index
、create
等)和对应的文档数据。每个子请求独占一行,并且以换行符 \n
分隔。例如:
{ "index" : { "_index" : "your_index", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2" }
{ "create" : { "_index" : "your_index", "_id" : "2" } }
{ "field1" : "value3", "field2" : "value4" }
- 发送请求:通过 HTTP 客户端(如
curl
)向 Elasticsearch 集群发送 POST
请求,请求地址为 <ES_URL>/_bulk
,并将构造好的请求体作为请求内容发送。例如:
curl -XPOST 'http://localhost:9200/_bulk' -H 'Content-Type: application/json' -d'
{ "index" : { "_index" : "your_index", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2" }
{ "create" : { "_index" : "your_index", "_id" : "2" } }
{ "field1" : "value3", "field2" : "value4" }
'
部分文档插入失败的返回结果及处理
- 返回结果:
_bulk
API 会返回一个 JSON 格式的响应,其中包含每个子请求的处理结果。如果部分文档插入失败,响应中会明确指出哪些文档插入失败以及失败的原因。例如:
{
"took": 2,
"errors": true,
"items": [
{
"index": {
"_index": "your_index",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201
}
},
{
"create": {
"_index": "your_index",
"_id": "2",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[2]: version conflict, document already exists (current version [1])",
"index_uuid": "your_index_uuid",
"shard": "0",
"index": "your_index"
}
}
}
]
}
errors
字段为 true
表示存在插入失败的文档。
- 每个子请求的结果在
items
数组中,其中失败的子请求会包含 error
字段,详细说明失败原因。
- 处理方式:
- 分析错误原因:根据返回结果中的
error
字段分析失败原因,如文档已存在(版本冲突)、字段类型不匹配等。
- 重试机制:对于一些可恢复的错误(如网络波动导致的临时错误),可以实现重试逻辑。可以根据失败的文档重新构造
_bulk
请求体进行重试。
- 记录日志:记录失败的文档及错误信息,以便后续分析和排查问题。可以将这些信息记录到日志文件或者发送到专门的日志收集系统。