Elasticsearch?

快速批量資料遷移。這個我們在別名一節中有提到過,這裡一起來看具體的實現吧。

我這裡使用的例項:重建mapping,需要將dm_v1中200萬記錄移動到dm_v3中。應用中使用別名dm所以在資料遷移完成後,只需要將dm指向dm_v3即可啦,這個前面別名一節也詳細描述啦。這裡主要看批量遷移資料。

Scroll批量獲取資料,Bulk批量操作資料。我這裡主要使用java來實現。

1 Elasticsearch 九、別名alias

方法/步驟

scroll查詢用於有效的從Elasticsearch中檢索大量文件,而無需支付深度分頁帶來的開銷。

Scrolling允許我們初始化搜尋將結果從Elasticsearch中按批次分離出來直到沒有更多結果。這個有點像傳統資料庫中的遊標。

Elasticsearch 十二、資料遷移 批量處理

GET /old_index/_search?scroll=1m

{

"query": { "match_all": {}},

"sort" : ["_doc"],

"size": 1000

}

1、因為保持scroll開啟消耗資源,所以我們需要設定超時時間。這裡保持1分鐘的連線

2、_doc是最有效的排序順序。

3、在掃描scan的時候,size是應用到每一個片shard上的,所以每一個批次中文件數量應該是size * number_of_primary_shards

Elasticsearch 十二、資料遷移 批量處理

該請求返回一個Base-64編碼的_scroll_id。現在我們可以通過_scroll_id用_search/scroll介面獲取下個批次的資料。

Elasticsearch 十二、資料遷移 批量處理

Bulk API使執行多次索引或者刪除操作在一個API中完成。這可以極大的提高索引速度。

Elasticsearch 十二、資料遷移 批量處理

好了現在來看Java中的實現吧

private static void getSearchDataByScrolls(QueryBuilder queryBuilder) {

String indexFrom = "dm_v1";

String indexTo = "dm_v3";

int timeMillis = 60000;

SearchResponse scrollResp = client.prepareSearch(indexFrom)

.setScroll(new TimeValue(timeMillis))

.setQuery(queryBuilder).setSize(1000).execute().actionGet();

while (true) {

BulkRequestBuilder bulkRequest = client.prepareBulk();

SearchHit[] hits = scrollResp.getHits().getHits();

System.out.println(hits.length);

if(hits.length > 0){

for (SearchHit searchHit : hits) {

bulkRequest.add(client.prepareIndex(indexTo

, searchHit.getType()

,searchHit.getId())

.setRefresh(true)

.setSource(searchHit.getSource()));

}

bulkRequest.execute().actionGet();

}

scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())

.setScroll(new TimeValue(timeMillis)).execute().actionGet();

if (scrollResp.getHits().getHits().length == 0) {

break;

}

}

}

Elasticsearch 十二、資料遷移 批量處理

執行

我的環境將size改成5000就會報記憶體溢位了,這個要看你們的環境了。

我這裡是5個分片所以size=5000每次處理25000條記錄。size * number_of_primary_shards

Elasticsearch 十二、資料遷移 批量處理

Elasticsearch 十二、資料遷移 批量處理

全部完成,設定的size比較小,200萬記錄差不多也用了半小時。忘了記錄具體執行時長了。

資料完全遷移了,記錄數是一致的。但是佔用磁碟大小不一樣,和總的記錄數有點不一致。說明在平時操作的過程中應該有些內容是沒有被完全收回的。。嗯,應該是這樣吧。

Elasticsearch 十二、資料遷移 批量處理

修改別名。

不過如果是生產環境應該先改別名再遷移資料吧。

Elasticsearch 十二、資料遷移 批量處理

Elasticsearch 十二、資料遷移 批量處理

追加個

SearchType中SCAN在2.1之後不建議使用了

Elasticsearch 十二、資料遷移 批量處理

相關問題答案