如何把数据快速批量添加到Elasticsearch中
1个回答
2017-10-24
展开全部
scroll查询用于有效的从Elasticsearch中检索大量文档,而无需支付深度分页带来的开销。
Scrolling允许我们初始化搜索将结果从Elasticsearch中按批次分离出来直到没有更多结果。这个有点像传统数据库中的游标。
GET /old_index/_search?scroll=1m
{
"query": { "match_all": {}},
"sort" : ["_doc"],
"size": 1000
}
1、因为保持scroll打开消耗资源,所以我们需要设置超时时间。这里保持1分钟的连接
2、_doc是最有效的排序顺序。
3、在扫描scan的时候,size是应用到每一个片shard上的,所以每一个批次中文档数量应该是size * number_of_primary_shards
该请求返回一个Base-64编码的_scroll_id。现在我们可以通过_scroll_id用_search/scroll接口获取下个批次的数据。
Bulk API使执行多次索引或者删除操作在一个API中完成。这可以极大的提高索引速度。
好了现在来看Java中的实现吧
private static void getSearchDataByScrolls(QueryBuilder queryBuilder) {
String indexFrom = "dm_v1";
String indexTo = "dm_v3";
int timeMillis = 60000;
SearchResponse scrollResp = client.prepareSearch(indexFrom)
.setScroll(new TimeValue(timeMillis))
.setQuery(queryBuilder).setSize(1000).execute().actionGet();
while (true) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
SearchHit[] hits = scrollResp.getHits().getHits();
System.out.println(hits.length);
if(hits.length > 0){
for (SearchHit searchHit : hits) {
bulkRequest.add(client.prepareIndex(indexTo
, searchHit.getType()
,searchHit.getId())
.setRefresh(true)
.setSource(searchHit.getSource()));
}
bulkRequest.execute().actionGet();
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(timeMillis)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
}
执行
我的环境将size改成5000就会报内存溢出了,这个要看你们的环境了。
我这里是5个分片所以size=5000每次处理25000条记录。size * number_of_primary_shards
全部完成,设置的size比较小,200万记录差不多也用了半小时。忘了记录具体执行时长了。
数据完全迁移了,记录数是一致的。但是占用磁盘大小不一样,和总的记录数有点不一致。说明在平时操作的过程中应该有些内容是没有被完全收回的。。嗯,应该是这样吧。
修改别名。
不过如果是生产环境应该先改别名再迁移数据吧。
追加个
SearchType中SCAN在2.1之后不建议使用了
Scrolling允许我们初始化搜索将结果从Elasticsearch中按批次分离出来直到没有更多结果。这个有点像传统数据库中的游标。
GET /old_index/_search?scroll=1m
{
"query": { "match_all": {}},
"sort" : ["_doc"],
"size": 1000
}
1、因为保持scroll打开消耗资源,所以我们需要设置超时时间。这里保持1分钟的连接
2、_doc是最有效的排序顺序。
3、在扫描scan的时候,size是应用到每一个片shard上的,所以每一个批次中文档数量应该是size * number_of_primary_shards
该请求返回一个Base-64编码的_scroll_id。现在我们可以通过_scroll_id用_search/scroll接口获取下个批次的数据。
Bulk API使执行多次索引或者删除操作在一个API中完成。这可以极大的提高索引速度。
好了现在来看Java中的实现吧
private static void getSearchDataByScrolls(QueryBuilder queryBuilder) {
String indexFrom = "dm_v1";
String indexTo = "dm_v3";
int timeMillis = 60000;
SearchResponse scrollResp = client.prepareSearch(indexFrom)
.setScroll(new TimeValue(timeMillis))
.setQuery(queryBuilder).setSize(1000).execute().actionGet();
while (true) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
SearchHit[] hits = scrollResp.getHits().getHits();
System.out.println(hits.length);
if(hits.length > 0){
for (SearchHit searchHit : hits) {
bulkRequest.add(client.prepareIndex(indexTo
, searchHit.getType()
,searchHit.getId())
.setRefresh(true)
.setSource(searchHit.getSource()));
}
bulkRequest.execute().actionGet();
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(timeMillis)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
}
执行
我的环境将size改成5000就会报内存溢出了,这个要看你们的环境了。
我这里是5个分片所以size=5000每次处理25000条记录。size * number_of_primary_shards
全部完成,设置的size比较小,200万记录差不多也用了半小时。忘了记录具体执行时长了。
数据完全迁移了,记录数是一致的。但是占用磁盘大小不一样,和总的记录数有点不一致。说明在平时操作的过程中应该有些内容是没有被完全收回的。。嗯,应该是这样吧。
修改别名。
不过如果是生产环境应该先改别名再迁移数据吧。
追加个
SearchType中SCAN在2.1之后不建议使用了
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询