2013-06-27 4 views
0

Я пытаюсь настроить один рабочий стол, который принимает все сообщения из одной очереди amqp/rabbitmq, фильтрует некоторые сообщения для отправки в statsD, а также отправляет ВСЕ сообщения в список поиска. Следующая реализация не отправляет ЛЮБЫЕ сообщения в ElasticSearch.Как настроить рабочий процесс 1: N с фильтрами в Logstash?

input { 
    rabbitmq { 
    host => "amqp-host" 
    queue => "elasticsearch" 
    key => "elasticsearch" 
    exchange => "elasticsearch" 
    type => "all" 
    durable => true 
    auto_delete => false 
    exclusive => false 
    format => "json_event" 
    debug => false 
    } 
} 

filter { 
    grep { 
     add_tag => "grepped" 
     match => ["@message", "Execution of .*? took .* sec"] 
    } 

    grok { 
     tags => ["grepped"] 
     add_tag => "grokked" 
     pattern => "Execution of %{DATA:command_name} took %{DATA:response_time} sec" 
    } 

    mutate { 
     tags => ["grepped", "grokked"] 
     lowercase => [ "command_name" ] 
     add_tag => ["mutated"] 
    } 
} 

output { 
    elasticsearch_river { 
    type => "all" 
    rabbitmq_host => "amqp-host" 
    debug => false 
    durable => true 
    persistent => true 
    es_host => "es-host" 
    exchange => "logstash-elasticsearch" 
    exchange_type => "direct" 
    index => "logs-%{+YYYY.MM.dd}" 
    index_type => "%{@type}" 
    queue => "logstash-elasticsearch" 
    } 

statsd { 
    type => "command-filter" 
    tags => ["grepped", "grokked", "mutated"] 
    host => "some.domain.local" 
    port => 1234 
    sender => "" 
    namespace => "" 
    timing => ["prefix.%{command_name}.suffix", "%{response_time}"] 
    increment => ["prefix.%{command_name}.suffix"] 
    } 
} 

Есть ли какой-нибудь фильтр? Или способ упорядочить теги, чтобы некоторые сообщения были отфильтрованы, а ВСЕ перенаправлены в ES?

ответ

1

Фильтр clone пригодится. Ниже приведен мой конфигурационный файл.

input { 
    rabbitmq { 
    host => "amqp-host" 
    queue => "elasticsearch" 
    key => "elasticsearch" 
    exchange => "elasticsearch" 
    type => "all" 
    durable => true 
    auto_delete => false 
    exclusive => false 
    format => "json_event" 
    debug => false 
    } 
} 

filter { 
    clone { 
     exclude_tags => ["cloned"] 
     clones => ["statsd", "elastic-search"] 
     add_tag => ["cloned"] 
    } 

    grep { 
     type => "statsd" 
     add_tag => "grepped" 
     match => ["@message", "Execution of .*Command took .* sec"] 
    } 

    grok { 
     type => "statsd" 
     tags => ["grepped"] 
     add_tag => "grokked" 
     pattern => "Execution of %{DATA:command_name}Command took %{DATA:response_time} sec" 
    } 

    mutate { 
     type => "statsd" 
     tags => ["grepped", "grokked"] 
     lowercase => [ "command_name" ] 
     add_tag => ["mutated"] 
    } 
} 

output { 
    elasticsearch_river { 
    type => "all" 
    rabbitmq_host => "amqp-host" 
    debug => false 
    durable => true 
    persistent => true 
    es_host => "es-host" 
    exchange => "logstash-elasticsearch" 
    exchange_type => "direct" 
    index => "logs-%{+YYYY.MM.dd}" 
    index_type => "%{@type}" 
    queue => "logstash-elasticsearch" 
    } 

    statsd { 
    type => "statsd" 
    tags => ["grepped", "grokked", "mutated"] 
    host => "some.host.local" 
    port => 1234 
    sender => "" 
    namespace => "" 
    timing => ["commands.%{command_name}.responsetime", "%{response_time}"] 
    increment => ["commands.%{command_name}.requests"] 
    } 
} 
+0

Не могли бы вы расширить на то, что клон делает здесь? действительно ли порядок элементов в фильтре? – FuzzyAmi

+1

@FuzzyAmi Прошло некоторое время, я быстро взглянул на документы. Фильтр «clone» создает новые события журнала. Однако каждое «клонированное» событие снова проходит через конвейер ENTIRE. Итак, я клонирую каждое сообщение, когда оно появляется, и добавляю «клонированный» тег с помощью 'add_tag ​​=> ['cloned']', а затем, когда он снова появляется, 'exclude_tags => ['cloned']' гарантирует Я не заканчиваю бесконечным циклом клонированных событий. 'clones => ['statsd', 'elastic-search']' key/value создает 2 клонированных события по одному с каждым тегом. Думайте об этом как о ветвящемся механизме для последующей обработки. – TCopple

+0

благодарит за разъяснения! – FuzzyAmi

0

Фильтр clone на самом деле не нужен в вашей ситуации.

Есть несколько вещей, которые я бы рекомендовал:

  • Пожалуйста упростить конфигурации, пока вы не получите «базу» работает. Не устанавливать дополнительные параметры теперь
  • Убедитесь type совпадала
  • Используйте теги, они спасатель
  • Когда «оглавлению» Вы должны оставаться последовательным и всегда использовать grok или всегда использовать grep. Я предпочитаю grok

Некоторые другие советы ... Выберите то, что вы хотите сделать с вашими сообщениями на основе того, что теги они установили. Ниже приведен пример моего конфигурационного фрагмента:

grok{ 
    type => "company" 
    pattern => ["((status:new))"] 
    add_tag => ["company-vprod-status-new", "company-vprod-status-new-%{company}", "company-vprod-status-new-%{candidate_username}"] 
    tags=> "company-vprod-status-change" 
} 
output { 
elasticsearch { 
    host => "127.0.0.1" 
    type => "company" 
    index => "logstash-syslog-%{+YYYY.MM.dd}" 
} 
statsd { 
    host => "graphite.test.company.com" 
    increment => ["vprod.statuses.new.all", "vprod.statuses.new.%{company}.all"] 
    tags => ["company-vprod-status-new"] 
} 
} 

Также обратите особое внимание на ваш тип. Если для атрибута type установлено значение, которое не существует, то этот блок никогда не будет запущен. Вот почему я предпочитаю использовать теги, если нет явной причины использовать типы.

+0

Все замечательные советы, но ответ не дает реального ответа на мою проблему. Проблема заключалась в том, что все сообщения проходили через фильтр grep, а те, которые его не пропускали, просто удалялись. – TCopple

+0

Кажется, что была еще одна проблема, чем то, о чем вы только что упоминали. Ваш блок ES был настроен на то, чтобы принимать все сообщения из типа «все», а не на основе тегов.Поэтому все, что у вас было, было правильным, и то, что вы упомянули, не похоже на преступника. Если ваше решение сработало, это действительно здорово, но на основе информации, которую вы предоставили, это не похоже на решение, которое даже можно было бы рассмотреть. Может быть, вы можете отредактировать свой ответ, чтобы объяснить, почему ваше решение работает и что оно решило? – Adam

0

Вы можете также добавить:

drop => false 

в конце Grep строфы (если вы все еще используете Grep, что есть)

 Смежные вопросы

  • Нет связанных вопросов^_^