Повторный поиск Упругий поиск через Bulk API, сканирование и прокрутку
Я пытаюсь повторно проиндексировать мою настройку поиска Elastic, в настоящее время смотрю документацию поиска Elastic и используя API-интерфейс Python
Я немного смущен тем, как все это работает. Мне удалось получить идентификатор прокрутки из API Python:
es = Elasticsearch("myhost")
index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")
scroll_id = response["_scroll_id"]
Теперь мой вопрос: какое использование для меня? Что мне известно о прокрутке? В документации говорится, что использовать "Bulk API", но я понятия не имею, как факторы scoll_id в этом, это было немного запутанно.
Может ли кто-нибудь дать краткий пример, показывающий, как мне переиндексировать с этого момента, учитывая, что у меня есть scroll_id правильно?
Ответы
Ответ 1
здесь приведен пример переиндексации на другой поиск elasticsearch node с использованием elasticsearch-py:
from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)
вы также можете переиндексировать результат запроса к другому индексу, вот как это сделать:
from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])
body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)
Ответ 2
Привет, вы можете использовать прокрутку api, чтобы просмотреть все документы самым эффективным способом. С помощью scroll_id вы можете найти сеанс, который хранится на сервере для вашего конкретного запроса прокрутки. Поэтому вам нужно предоставить scroll_id с каждым запросом, чтобы получить больше элементов.
Объемный api предназначен для более эффективных индексирующих документов. При копировании и индексе вам нужны оба, но они действительно не связаны.
У меня есть код Java, который поможет вам лучше понять, как это работает.
public void reIndex() {
logger.info("Start creating a new index based on the old index.");
SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
.setQuery(matchAllQuery())
.setSearchType(SearchType.SCAN)
.setScroll(createScrollTimeoutValue())
.setSize(SCROLL_SIZE).execute().actionGet();
BulkProcessor bulkProcessor = BulkProcessor.builder(client,
createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
.setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
.setFlushInterval(createFlushIntervalTime())
.build();
while (true) {
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(createScrollTimeoutValue()).execute().actionGet();
if (searchResponse.getHits().getHits().length == 0) {
logger.info("Closing the bulk processor");
bulkProcessor.close();
break; //Break condition: No hits are returned
}
for (SearchHit hit : searchResponse.getHits()) {
IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
request.source(hit.sourceRef());
bulkProcessor.add(request);
}
}
}
Ответ 3
Для всех, кто сталкивается с этой проблемой, вы можете использовать следующий API от клиента Python для переиндексации:
https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex
Это поможет вам избежать прокрутки и поиска, чтобы получить все данные и использовать массовый API для размещения данных в новом индексе.
Ответ 4
Лучший способ переиндексации - это использование встроенного в Reindex API Elasticsearch, поскольку он хорошо поддерживается и устойчив к известным проблемам.
API-интерфейс Elasticsaerch Reindex использует пакетную и массовую индексацию в пакетном режиме и позволяет преобразовывать данные по сценариям. В Python может быть разработана похожая процедура:
#!/usr/local/bin/python
from elasticsearch import Elasticsearch
from elasticsearch import helpers
src = Elasticsearch(['localhost:9202'])
dst = Elasticsearch(['localhost:9200'])
body = {"query": { "match_all" : {}}}
source_index='src-index'
target_index='dst-index'
scroll_time='60s'
batch_size='500'
def transform(hits):
for h in hits:
h['_index'] = target_index
yield h
rs = src.search(index=[source_index],
scroll=scroll_time,
size=batch_size,
body=body
)
helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)
while True:
scroll_id = rs['_scroll_id']
rs = src.scroll(scroll_id=scroll_id, scroll=scroll_time)
if len(rs['hits']['hits']) > 0:
helpers.bulk(dst, transform(rs['hits']['hits']), chunk_size=batch_size)
else:
break;