我正在尝试重新索引我的Elastic搜索设置,目前正在查看Elastic的搜索文档和使用Python API 的示例
不过,我对这一切是如何运作的有点困惑。我能够从Python API获得滚动ID:
es = Elasticsearch("myhost")
index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")
scroll_id = response["_scroll_id"]
现在我的问题是,这对我有什么用?知道滚动id会给我什么?文档中说要使用"Bulk API",但我不知道scoll_id是如何影响这一点的,这有点令人困惑。
考虑到我已经正确地获得了scroll_id,有人能举一个简单的例子来展示我如何从这一点重新索引吗?
这里有一个使用弹性搜索py重新索引到另一个弹性搜索节点的示例:
from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)
您也可以将查询结果重新索引到不同的索引中。以下是如何做到的:
from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])
body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)
Hi您可以使用滚动api以最有效的方式浏览所有文档。使用scroll_id,您可以为您的特定滚动请求找到存储在服务器上的会话。因此,您需要为每个请求提供scroll_id,以获得更多项目。
批量api用于更高效地索引文档。当复制和索引时,您需要两者,但它们并不是真正相关的。
我确实有一些java代码可以帮助你更好地了解它的工作原理。
public void reIndex() {
logger.info("Start creating a new index based on the old index.");
SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
.setQuery(matchAllQuery())
.setSearchType(SearchType.SCAN)
.setScroll(createScrollTimeoutValue())
.setSize(SCROLL_SIZE).execute().actionGet();
BulkProcessor bulkProcessor = BulkProcessor.builder(client,
createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
.setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
.setFlushInterval(createFlushIntervalTime())
.build();
while (true) {
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(createScrollTimeoutValue()).execute().actionGet();
if (searchResponse.getHits().getHits().length == 0) {
logger.info("Closing the bulk processor");
bulkProcessor.close();
break; //Break condition: No hits are returned
}
for (SearchHit hit : searchResponse.getHits()) {
IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
request.source(hit.sourceRef());
bulkProcessor.add(request);
}
}
}
对于遇到此问题的任何人,您可以从Python客户端使用以下API来重新索引:
https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex
这将帮助您避免必须滚动和搜索才能获得所有数据,并使用批量API将数据放入新索引。
重新索引的最佳方法是使用Elasticsearch内置的reindex API,因为它得到了很好的支持,并且对已知问题具有弹性。
Elasticsaerch Reindex API使用批量滚动和批量索引,并允许数据的脚本转换。在Python中,可以开发类似的例程:
#!/usr/local/bin/python
from elasticsearch import Elasticsearch
from elasticsearch import helpers
src = Elasticsearch(['localhost:9202'])
dst = Elasticsearch(['localhost:9200'])
body = {"query": { "match_all" : {}}}
source_index='src-index'
target_index='dst-index'
scroll_time='60s'
batch_size='500'
def transform(hits):
for h in hits:
h['_index'] = target_index
yield h
rs = src.search(index=[source_index],
scroll=scroll_time,
size=batch_size,
body=body
)
helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)
while True:
scroll_id = rs['_scroll_id']
rs = src.scroll(scroll_id=scroll_id, scroll=scroll_time)
if len(rs['hits']['hits']) > 0:
helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)
else:
break;