快速批量資料遷移。這個我們在別名一節中有提到過,這裡一起來看具體的實現吧。
我這裡使用的例項:重建mapping,需要將dm_v1中200萬記錄移動到dm_v3中。應用中使用別名dm所以在資料遷移完成後,只需要將dm指向dm_v3即可啦,這個前面別名一節也詳細描述啦。這裡主要看批量遷移資料。
Scroll批量獲取資料,Bulk批量操作資料。我這裡主要使用java來實現。
1 Elasticsearch 九、別名alias
方法/步驟
scroll查詢用於有效的從Elasticsearch中檢索大量文件,而無需支付深度分頁帶來的開銷。
Scrolling允許我們初始化搜尋將結果從Elasticsearch中按批次分離出來直到沒有更多結果。這個有點像傳統資料庫中的遊標。
GET /old_index/_search?scroll=1m
{
"query": { "match_all": {}},
"sort" : ["_doc"],
"size": 1000
}
1、因為保持scroll開啟消耗資源,所以我們需要設定超時時間。這裡保持1分鐘的連線
2、_doc是最有效的排序順序。
3、在掃描scan的時候,size是應用到每一個片shard上的,所以每一個批次中文件數量應該是size * number_of_primary_shards
該請求返回一個Base-64編碼的_scroll_id。現在我們可以通過_scroll_id用_search/scroll介面獲取下個批次的資料。
Bulk API使執行多次索引或者刪除操作在一個API中完成。這可以極大的提高索引速度。
好了現在來看Java中的實現吧
private static void getSearchDataByScrolls(QueryBuilder queryBuilder) {
String indexFrom = "dm_v1";
String indexTo = "dm_v3";
int timeMillis = 60000;
SearchResponse scrollResp = client.prepareSearch(indexFrom)
.setScroll(new TimeValue(timeMillis))
.setQuery(queryBuilder).setSize(1000).execute().actionGet();
while (true) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
SearchHit[] hits = scrollResp.getHits().getHits();
System.out.println(hits.length);
if(hits.length > 0){
for (SearchHit searchHit : hits) {
bulkRequest.add(client.prepareIndex(indexTo
, searchHit.getType()
,searchHit.getId())
.setRefresh(true)
.setSource(searchHit.getSource()));
}
bulkRequest.execute().actionGet();
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(timeMillis)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
}
執行
我的環境將size改成5000就會報記憶體溢位了,這個要看你們的環境了。
我這裡是5個分片所以size=5000每次處理25000條記錄。size * number_of_primary_shards
全部完成,設定的size比較小,200萬記錄差不多也用了半小時。忘了記錄具體執行時長了。
資料完全遷移了,記錄數是一致的。但是佔用磁碟大小不一樣,和總的記錄數有點不一致。說明在平時操作的過程中應該有些內容是沒有被完全收回的。。嗯,應該是這樣吧。
修改別名。
不過如果是生產環境應該先改別名再遷移資料吧。
追加個
SearchType中SCAN在2.1之後不建議使用了