2017-02-17 29 views
0

Ситуация:Logstash: Если на основе падение на Событии После Сплите

Мои входные журналы выглядеть примерно так:

{ 
"key1":"value1" 
"key2":"value2" 
"events": 
[{ 
"Level":"Information", 
"Code":"100" 
}, 
{ 
"Level":"Information", 
"SomeKey":"SomeValue" 
}, 
{ 
"Level":"Error", 
"Message":"Something went wrong" 
} 
]} 

Я хочу:

  1. разделить массив событий для создания отдельных объектов со всеми полями внешнего уровня, которые все еще заполнены (key1 и key2).

  2. выборочно отбрасывать журналы после разделения. Я только хочу сохранить журналы «Информационный уровень», если они содержат свойство «Код».

Мой logstash конфиг выглядит

filter { 
split { 
    field => "[events]" 
    } 
} 

filter { 
    if ![events][Code] 
    { drop {} } 
} 

output { 
    elasticsearch {} 
} 

Проблема:

Logstash не кажется, чтобы отделить события перед выполнением второго фильтра.

Другими словами, если какое-либо из событий в журнале не имеет поля «Код», весь журнал отбрасывается, включая информацию об уровне «Ошибка», которую я должен сохранить.

Я был на этом целый день, и это действительно нервничает. Я бы вручную попытался создать свой собственный плагин, но я никогда не использовал Ruby.

Я уверен, что это не имеет значения, но я запускаю стек ELK в Docker. Я уверен, что файлы конфигурации загружаются должным образом и что они используются в Logstash.

ответ

0

Это на самом деле похоже на ошибку в логсташе. Вот демонстрация того, что это ошибка:

Файл конфигурации:

input { 
    stdin { codec => "json" } 
} 
filter { 
    split { field => "events" } 
    if ([events] == "" or [events][Code] == "") { 
     drop {} 
    } 
} 
output { 
    stdout { codec => "rubydebug" } 
} 

Командная строка:

echo '{"key1":"value1","key2":"value2","events":[{"Level":"Information","Code":"100"},{"Level":"Information","SomeKey":"SomeValue"},{"Level":"Error","Message":"Something went wrong"}]}' | bin/logstash -f test.conf 

Результат:

Exception in thread "[main]>worker7" java.lang.NumberFormatException: For input string: "Code" 
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) 
    at java.lang.Integer.parseInt(Integer.java:580) 
    at java.lang.Integer.parseInt(Integer.java:615) 
    at org.logstash.Accessors.fetch(Accessors.java:130) 
    at org.logstash.Accessors.get(Accessors.java:20) 
    at org.logstash.Event.getUnconvertedField(Event.java:160) 
    at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_get_field(JrubyEventExtLibrary.java:113) 
    at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$1$0$ruby_get_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$1$0$ruby_get_field.gen) 

Так в основном он все еще думает, что события является массив и пытается проиндексировать его по «Коду»

Это, кажется, работает вокруг вопроса:

mutate { 
      add_field => ["code", "%{[events][Code]}"] 
    } 
    if ([code] == "%{[events][Code]}") { 
      drop {} 
    } 
    mutate { 
      remove_field => ["code"] 
    } 
+0

Этот хак сделал работу, спасибо! Должен ли я беспокоиться о том, что хак перестанет работать в будущих обновлениях? –

+0

Этот конкретный хак не должен ломаться с новыми версиями ... он копирует поле из одного места в другое, а затем проверяет, является ли оно '% {[events] [Code]}' - это будет точная строка если нет [событий] [Код], потому что logstash не выполняет замену значений, которые не заданы. – Alcanzar