2016-12-28 2 views
0

Я хочу использовать kafka как входной и logstash как вывод. Я напишу несколько тем в logstash и хочу фильтровать по темам. Я попытался написать код:Как написать фильтр Logstash для фильтрации kafka тем

input { 
    kafka { 
     bootstrap_servers => "localhost:9092" 
     topics => ["test", "payment"] 
     } 
} 

filter { 
    if [topic] = "test" { 
     //do something 
    } else { 
     //do something 
    } 
} 

Но, похоже, это не сработает.

+0

В чем проблема вы столкнулись? Любая ошибка? И вы пропускаете 's' в свой фильтр' topic', который должен быть 'тем 'вместо' topic'? – Kulasangar

ответ

0

Вы должны добавить decorate_events, чтобы добавить kafka.

Возможность добавления метаданных Kafka как темы, размер сообщения для мероприятия. Это добавит поле с именем kafka к событию logstash, содержащему следующие атрибуты: topic: Тема это сообщение связано с consumer_group: группа потребителей, используемая для чтения в этом разделе событий: раздел это сообщение связано со смещением: смещение от раздел это сообщение связанно с ключом: ByteBuffer содержащего сообщение ключа

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

, а затем обновить конф как этого

input { 
    kafka { 
    bootstrap_servers => "localhost:9092" 
    topics => ["test", "payment"] 
    } 
} 

filter { 
    if [kafka][topic] = "test" { 
    //do something 
    } else { 
    //do something 
    } 
} 
0

Изменения входной часть путем добавления decorate_events добавить kafka поле.

input { 
    kafka { 
     bootstrap_servers => "localhost:9092" 
     topics => ["test", "payment"] 
     decorate_events => true 
    } 
} 

Изменить фильтр часть следующим образом:

filter { 
    if [@metadata][kafka][topic] == "test" { 
     //do something 
    } else { 
     //do something 
    } 
}