Индексирование JSON массивов Logstash
Логсташ потрясающий. Я могу отправить JSON таким образом (многослойный для удобочитаемости):
{
"a": "one"
"b": {
"alpha":"awesome"
}
}
И затем запрос для этой строки в кибане, используя поисковый запрос b.alpha:awesome
. Ницца.
Однако теперь у меня есть строка журнала JSON:
{
"different":[
{
"this": "one",
"that": "uno"
},
{
"this": "two"
}
]
}
И я хотел бы найти эту строку с поиском, например different.this:two
(или different.this:one
или different.that:uno
)
Если бы я напрямую использовал Lucene, я бы перебирал массив different
и создавал новый индекс поиска для каждого хэша внутри него, но Logstash в настоящее время, похоже, глотает эту строку следующим образом:
different: {this: one, that: uno}, {this: two}
Что не поможет мне искать строки журнала, используя different.this
или different.that
.
Есть ли какие-либо мысли относительно кодека, фильтра или изменения кода, которые я могу сделать, чтобы включить это?
Ответы
Ответ 1
Вы можете написать свой собственный filter (скопируйте и вставьте, переименуйте имя класса, config_name
и перепишите filter(event)
метод) или изменить текущий JSON фильтр (источник на Github)
Вы можете найти исходный код фильтра JSON (Ruby class) в следующем пути logstash-1.x.x\lib\logstash\filters
, названном как json.rb
. Фильтр JSON анализирует содержимое как JSON следующим образом
begin
# TODO(sissel): Note, this will not successfully handle json lists
# like your text is '[ 1,2,3 ]' JSON.parse gives you an array (correctly)
# which won't merge into a hash. If someone needs this, we can fix it
# later.
dest.merge!(JSON.parse(source))
# If no target, we target the root of the event object. This can allow
# you to overwrite @timestamp. If so, let parse it as a timestamp!
if [email protected] && event[TIMESTAMP].is_a?(String)
# This is a hack to help folks who are mucking with @timestamp during
# their json filter. You aren't supposed to do anything with
# "@timestamp" outside of the date filter, but nobody listens... ;)
event[TIMESTAMP] = Time.parse(event[TIMESTAMP]).utc
end
filter_matched(event)
rescue => e
event.tag("_jsonparsefailure")
@logger.warn("Trouble parsing json", :source => @source,
:raw => event[@source], :exception => e)
return
end
Вы можете изменить процедуру синтаксического анализа, чтобы изменить исходный JSON
json = JSON.parse(source)
if json.is_a?(Hash)
json.each do |key, value|
if value.is_a?(Array)
value.each_with_index do |object, index|
#modify as you need
object["index"]=index
end
end
end
end
#save modified json
......
dest.merge!(json)
то вы можете изменить свой файл конфигурации, чтобы использовать/ваш новый/модифицированный фильтр JSON и поместить его в \logstash-1.x.x\lib\logstash\config
Это мой elastic_with_json.conf
с модифицированным фильтром json.rb
input{
stdin{
}
}filter{
json{
source => "message"
}
}output{
elasticsearch{
host=>localhost
}stdout{
}
}
если вы хотите использовать свой новый фильтр, вы можете настроить его с помощью config_name
class LogStash::Filters::Json_index < LogStash::Filters::Base
config_name "json_index"
milestone 2
....
end
и настройте его
input{
stdin{
}
}filter{
json_index{
source => "message"
}
}output{
elasticsearch{
host=>localhost
}stdout{
}
}
Надеюсь, что это поможет.
Ответ 2
Для быстрого и грязного взлома я использовал фильтр Ruby
и код ниже, не нужно больше использовать фильтр "json"
input {
stdin{}
}
filter {
grok {
match => ["message","(?<json_raw>.*)"]
}
ruby {
init => "
def parse_json obj, pname=nil, event
obj = JSON.parse(obj) unless obj.is_a? Hash
obj = obj.to_hash unless obj.is_a? Hash
obj.each {|k,v|
p = pname.nil?? k : pname
if v.is_a? Array
v.each_with_index {|oo,ii|
parse_json_array(oo,ii,p,event)
}
elsif v.is_a? Hash
parse_json(v,p,event)
else
p = pname.nil?? k : [pname,k].join('.')
event[p] = v
end
}
end
def parse_json_array obj, i,pname, event
obj = JSON.parse(obj) unless obj.is_a? Hash
pname_ = pname
if obj.is_a? Hash
obj.each {|k,v|
p=[pname_,i,k].join('.')
if v.is_a? Array
v.each_with_index {|oo,ii|
parse_json_array(oo,ii,p,event)
}
elsif v.is_a? Hash
parse_json(v,p, event)
else
event[p] = v
end
}
else
n = [pname_, i].join('.')
event[n] = obj
end
end
"
code => "parse_json(event['json_raw'].to_s,nil,event) if event['json_raw'].to_s.include? ':'"
}
}
output {
stdout{codec => rubydebug}
}
Тестирование структуры json
{"id":123, "members":[{"i":1, "arr":[{"ii":11},{"ii":22}]},{"i":2}], "im_json":{"id":234, "members":[{"i":3},{"i":4}]}}
и это то, что выводит
{
"message" => "{\"id\":123, \"members\":[{\"i\":1, \"arr\":[{\"ii\":11},{\"ii\":22}]},{\"i\":2}], \"im_json\":{\"id\":234, \"members\":[{\"i\":3},{\"i\":4}]}}",
"@version" => "1",
"@timestamp" => "2014-07-25T00:06:00.814Z",
"host" => "Leis-MacBook-Pro.local",
"json_raw" => "{\"id\":123, \"members\":[{\"i\":1, \"arr\":[{\"ii\":11},{\"ii\":22}]},{\"i\":2}], \"im_json\":{\"id\":234, \"members\":[{\"i\":3},{\"i\":4}]}}",
"id" => 123,
"members.0.i" => 1,
"members.0.arr.0.ii" => 11,
"members.0.arr.1.ii" => 22,
"members.1.i" => 2,
"im_json" => 234,
"im_json.0.i" => 3,
"im_json.1.i" => 4
}
Ответ 3
Решением, которое мне понравилось, является рубиновый фильтр, потому что это требует от нас не писать другой фильтр. Однако это решение создает поля, которые находятся в "корне" JSON, и трудно отслеживать, как выглядел оригинальный документ.
Я придумал что-то похожее, которое легче следовать и является рекурсивным решением, поэтому оно чище.
ruby {
init => "
def arrays_to_hash(h)
h.each do |k,v|
# If v is nil, an array is being iterated and the value is k.
# If v is not nil, a hash is being iterated and the value is v.
value = v || k
if value.is_a?(Array)
# "value" is replaced with "value_hash" later.
value_hash = {}
value.each_with_index do |v, i|
value_hash[i.to_s] = v
end
h[k] = value_hash
end
if value.is_a?(Hash) || value.is_a?(Array)
arrays_to_hash(value)
end
end
end
"
code => "arrays_to_hash(event.to_hash)"
}
Он преобразует массивы в каждый номер с номером индекса. Подробнее: - http://blog.abhijeetr.com/2016/11/logstashelasticsearch-best-way-to.html