1. 制定标准化规范
- 明确新的文档操作定义,包括文档结构、字段类型、索引策略等具体标准。
2. 设计过渡方案
- 双写模式:
- 在业务代码中,对新写入的数据,同时按照新旧两种规范写入 ElasticSearch。旧数据按原方式写,新数据按新规范写一份。这样既保证业务不受影响,又开始积累新规范的数据。
- 代码示例(以 Java 为例,使用 Elasticsearch Java High - Level REST Client):
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")));
// 旧规范写入
IndexRequest oldRequest = new IndexRequest("old_index")
.id("1")
.source("field1", "value1", XContentType.JSON);
client.index(oldRequest, RequestOptions.DEFAULT);
// 新规范写入
IndexRequest newRequest = new IndexRequest("new_index")
.id("1")
.source("new_field1", "value1", XContentType.JSON);
client.index(newRequest, RequestOptions.DEFAULT);
- 版本控制:在文档中添加版本字段,标识数据遵循的规范版本。例如,添加
version
字段,值为 "old" 或 "new",方便区分和处理。
3. 数据迁移
- 全量迁移:
- 停机迁移:选择业务低峰期,暂停业务写入,对旧数据进行全量读取,按照新规范进行转换后写入新的索引。
- 不停机迁移:利用 Elasticsearch 的快照和恢复功能,对旧数据创建快照,然后在新的索引结构中恢复并转换数据。同时,在迁移过程中,业务的读写操作仍可正常进行,但需注意数据的一致性。
- 代码示例(以 Python 为例,使用 Elasticsearch Python client):
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
# 获取旧索引数据
old_data = es.search(index='old_index', body={"query": {"match_all": {}}})
# 转换数据
new_data = []
for hit in old_data['hits']['hits']:
source = hit['_source']
new_source = {
"new_field1": source.get("field1")
}
new_data.append(new_source)
# 写入新索引
for data in new_data:
es.index(index='new_index', body=data)
- 增量迁移:在全量迁移完成后,继续使用双写模式一段时间,将增量数据实时迁移到新规范。当确认新规范的数据完整且稳定后,逐步淘汰旧数据和旧索引。
4. 确保兼容性
- 查询兼容:
- 在业务查询代码中,根据文档的版本字段,动态调整查询逻辑。如果是旧版本数据,使用旧的查询方式;如果是新版本数据,使用新的查询方式。
- 代码示例(以 JavaScript 为例,使用 Elasticsearch JavaScript client):
const { Client } = require('@elastic/elasticsearch');
const client = new Client({ node: 'http://localhost:9200' });
async function searchData() {
const response = await client.search({
index: 'combined_index',
body: {
query: {
match_all: {}
}
}
});
response.hits.hits.forEach(hit => {
if (hit._source.version === 'old') {
// 旧版本查询处理逻辑
} else {
// 新版本查询处理逻辑
}
});
}
- 数据格式兼容:在新旧规范转换过程中,确保数据的关键信息不丢失。对于新规范中新增的字段,可以设置默认值;对于旧规范中废弃的字段,在迁移时可以考虑保留一段时间,以保证兼容性。
5. 处理数据一致性问题
- 乐观锁机制:在更新文档时,使用版本号进行乐观锁控制。每次更新前获取文档的当前版本号,更新时将版本号作为参数传递。如果版本号不一致,说明数据已被其他操作修改,需要重新获取数据并更新。
IndexRequest request = new IndexRequest("index")
.id("1")
.source("field1", "new_value", XContentType.JSON)
.version(1); // 假设初始版本号为 1
try {
client.index(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
// 版本冲突,重新获取数据并更新
GetRequest getRequest = new GetRequest("index", "1");
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
long newVersion = getResponse.getVersion();
request.version(newVersion);
client.index(request, RequestOptions.DEFAULT);
}
}
- 数据校验与修复:定期对迁移后的数据进行校验,比对新旧版本数据的关键信息。可以通过编写脚本,计算数据的哈希值或其他校验和,发现不一致的数据及时进行修复。例如,使用 Python 脚本遍历新旧索引数据,对比关键字段:
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
old_index_data = es.search(index='old_index', body={"query": {"match_all": {}}})
new_index_data = es.search(index='new_index', body={"query": {"match_all": {}}})
for old_hit in old_index_data['hits']['hits']:
old_source = old_hit['_source']
new_hit = next((hit for hit in new_index_data['hits']['hits'] if hit['_id'] == old_hit['_id']), None)
if new_hit:
new_source = new_hit['_source']
if old_source['key_field']!= new_source['new_key_field']:
# 数据不一致,进行修复
es.update(index='new_index', id=old_hit['_id'], body={"doc": {"new_key_field": old_source['key_field']}})