Создание DataFrame из результатов ElasticSearch
Я пытаюсь создать DataFrame в pandas, используя результаты очень простого запроса к ElasticSearch. Я получаю данные, которые мне нужны, но это вопрос обрезания результатов способом построения правильного фрейма данных. Я действительно забочусь только о том, чтобы получить метку времени и путь каждого результата. Я пробовал несколько разных шаблонов es.search.
код:
from datetime import datetime
from elasticsearch import Elasticsearch
from pandas import DataFrame, Series
import pandas as pd
import matplotlib.pyplot as plt
es = Elasticsearch(host="192.168.121.252")
res = es.search(index="_all", doc_type='logs', body={"query": {"match_all": {}}}, size=2, fields=('path','@timestamp'))
Это дает 4 куска данных. [u'hits ', u'_shards', u'took ', u'timed_out']. Мои результаты попадают в хиты.
res['hits']['hits']
Out[47]:
[{u'_id': u'a1XHMhdHQB2uV7oq6dUldg',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.086Z',
u'path': u'app2.log'}},
{u'_id': u'TcBvro_1QMqF4ORC-XlAPQ',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.200Z',
u'path': u'app1.log'}}]
Единственное, что меня волнует, - получить метку времени и путь для каждого попадания.
res['hits']['hits'][0]['fields']
Out[48]:
{u'@timestamp': u'2014-08-07T12:36:00.086Z',
u'path': u'app1.log'}
Я не могу, чтобы жизнь меня определяла, кто получит этот результат, в кадр данных в pandas. Итак, для двух результатов, которые я вернул, я ожидал бы, например, dataframe.
timestamp path
0 2014-08-07T12:36:00.086Z app1.log
1 2014-08-07T12:36:00.200Z app2.log
Ответы
Ответ 1
Есть хорошая игрушка под названием pd.DataFrame.from_dict
, которую вы можете использовать в такой ситуации:
In [34]:
Data = [{u'_id': u'a1XHMhdHQB2uV7oq6dUldg',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.086Z',
u'path': u'app2.log'}},
{u'_id': u'TcBvro_1QMqF4ORC-XlAPQ',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.200Z',
u'path': u'app1.log'}}]
In [35]:
df = pd.concat(map(pd.DataFrame.from_dict, Data), axis=1)['fields'].T
In [36]:
print df.reset_index(drop=True)
@timestamp path
0 2014-08-07T12:36:00.086Z app2.log
1 2014-08-07T12:36:00.200Z app1.log
Покажите его в четыре этапа:
1, Прочитайте каждый элемент в списке (который является dictionary
) в DataFrame
2, мы можем поместить все элементы в список в большой DataFrame
на concat
их по-строке, так как мы сделаем шаг №1 для каждого элемента, мы можем использовать map
для этого.
3, Затем мы обращаемся к столбцам с меткой 'fields'
4 Мы, вероятно, хотим повернуть DataFrame
90 градусов (транспонирование) и reset_index
, если хотим, чтобы индекс был по умолчанию int
.
![enter image description here]()
Ответ 2
Или вы можете использовать функцию json_normalize pandas:
from pandas.io.json import json_normalize
df = json_normalize(res['hits']['hits'])
И затем фильтрация результирующего фрейма по именам столбцов
Ответ 3
Еще лучше, вы можете использовать фантастическую библиотеку pandasticsearch
:
from elasticsearch import Elasticsearch
es = Elasticsearch('http://localhost:9200')
result_dict = es.search(index="recruit", body={"query": {"match_all": {}}})
from pandasticsearch import Select
pandas_df = Select.from_dict(result_dict).to_pandas()
Ответ 4
Я проверил все ответы на производительность и обнаружил, что подход pandasticsearch
является самым быстрым с большим отрывом:
Тесты:
test1 (используя from_dict)
%timeit -r 2 -n 5 teste1(resp)
10,5 с ± 247 мс на цикл (среднее ± стандартное отклонение из 2 циклов, по 5 циклов в каждом)
test2 (используя список)
%timeit -r 2 -n 5 teste2(resp)
2,05 с ± 8,17 мс на цикл (среднее ± стандартное отклонение из 2 циклов, по 5 циклов в каждом)
test3 (используя импорт pandasticsearch в качестве pdes)
%timeit -r 2 -n 5 teste3(resp)
39,2 мс ± 5,89 мс на цикл (среднее ± стандартное отклонение из 2 циклов по 5 циклов в каждом)
test4 (используя импорт из pandas.io.json json_normalize)
%timeit -r 2 -n 5 teste4(resp)
387 мс ± 19 мс на цикл (среднее ± стандартное отклонение из 2 циклов, по 5 циклов в каждом)
Я надеюсь, что это может быть полезным для всех
КОД:
index = 'teste_85'
size = 10000
fields = True
sort = ['col1','desc']
query = 'teste'
range_gte = '2016-01-01'
range_lte = 'now'
resp = esc.search(index = index,
size = size,
scroll = '2m',
_source = fields,
doc_type = '_doc',
body = {
"sort" : { "{0}".format(sort[0]) : {"order" : "{0}".format(sort[1])}},
"query": {
"bool": {
"must": [
{ "query_string": { "query": "{0}".format(query) } },
{ "range": { "anomes": { "gte": "{0}".format(range_gte), "lte": "{0}".format(range_lte) } } },
]
}
}
})
def teste1(resp):
df = pd.DataFrame(columns=list(resp['hits']['hits'][0]['_source'].keys()))
for hit in resp['hits']['hits']:
df = df.append(df.from_dict(hit['_source'], orient='index').T)
return df
def teste2(resp):
col=list(resp['hits']['hits'][0]['_source'].keys())
for hit in resp['hits']['hits']:
df = pd.DataFrame(list(hit['_source'].values()), col).T
return df
def teste3(resp):
df = pdes.Select.from_dict(resp).to_pandas()
return df
def teste4(resp):
df = json_normalize(resp['hits']['hits'])
return df
Ответ 5
Здесь немного кода, который может оказаться полезным для вашей работы. Он прост и расширяем, но сэкономил мне много времени, когда вы столкнулись с просто "хватанием" некоторых данных от ElasticSearch для анализа.
Если вы просто хотите захватить все данные данного индекса и doc_type вашего локального хоста, вы можете сделать:
df = ElasticCom(index='index', doc_type='doc_type').search_and_export_to_df()
Вы можете использовать любой из аргументов, которые вы обычно использовали в elasticsearch.search(), или указать другой хост. Вы также можете выбрать, включать ли _id или нет, и указать, находятся ли данные в '_source' или 'fields' (он пытается угадать). Он также пытается преобразовать значения полей по умолчанию (но вы можете отключить это).
Здесь код:
from elasticsearch import Elasticsearch
import pandas as pd
class ElasticCom(object):
def __init__(self, index, doc_type, hosts='localhost:9200', **kwargs):
self.index = index
self.doc_type = doc_type
self.es = Elasticsearch(hosts=hosts, **kwargs)
def search_and_export_to_dict(self, *args, **kwargs):
_id = kwargs.pop('_id', True)
data_key = kwargs.pop('data_key', kwargs.get('fields')) or '_source'
kwargs = dict({'index': self.index, 'doc_type': self.doc_type}, **kwargs)
if kwargs.get('size', None) is None:
kwargs['size'] = 1
t = self.es.search(*args, **kwargs)
kwargs['size'] = t['hits']['total']
return get_search_hits(self.es.search(*args, **kwargs), _id=_id, data_key=data_key)
def search_and_export_to_df(self, *args, **kwargs):
convert_numeric = kwargs.pop('convert_numeric', True)
convert_dates = kwargs.pop('convert_dates', 'coerce')
df = pd.DataFrame(self.search_and_export_to_dict(*args, **kwargs))
if convert_numeric:
df = df.convert_objects(convert_numeric=convert_numeric, copy=True)
if convert_dates:
df = df.convert_objects(convert_dates=convert_dates, copy=True)
return df
def get_search_hits(es_response, _id=True, data_key=None):
response_hits = es_response['hits']['hits']
if len(response_hits) > 0:
if data_key is None:
for hit in response_hits:
if '_source' in hit.keys():
data_key = '_source'
break
elif 'fields' in hit.keys():
data_key = 'fields'
break
if data_key is None:
raise ValueError("Neither _source nor fields were in response hits")
if _id is False:
return [x.get(data_key, None) for x in response_hits]
else:
return [dict(_id=x['_id'], **x.get(data_key, {})) for x in response_hits]
else:
return []
Ответ 6
Для всех, кто сталкивается с этим вопросом, тоже. @CT Zhu имеет хороший ответ, но я думаю, что он немного устарел. но когда вы используете пакетasticsearch_dsl. Результат немного другой. Попробуйте это в этом случае:
# Obtain the results..
res = es_dsl.Search(using=con, index='_all')
res_content = res[0:100].execute()
# convert it to a list of dicts, by using the .to_dict() function
res_filtered = [x['_source'].to_dict() for x in res_content['hits']['hits']]
# Pass this on to the 'from_dict' function
A = pd.DataFrame.from_dict(res_filtered)
Ответ 7
Если ваш запрос может вернуть более 10 000 документов из Elasticsearch, вам нужно будет использовать функцию прокрутки Elasticsearch. Документацию и примеры для этой функции довольно сложно найти, поэтому я приведу вам полный рабочий пример:
import pandas as pd
from elasticsearch import Elasticsearch
import elasticsearch.helpers
es = Elasticsearch('http://localhost:9200')
body={"query": {"match_all": {}}}
results = elasticsearch.helpers.scan(es, query=body, index="my_index")
df = pd.DataFrame.from_dict([document['_source'] for document in results])
Просто отредактируйте поля, начинающиеся с "my_", чтобы они соответствовали вашим собственным значениям.