2014-02-19 3 views
1

Я очень новичок в logstash. Я могу запустить файл журнала logstash и посмотреть веб-страницу kibana. Это круто ~~Как получить номера из сообщения журнала в logstash?

Теперь я хочу изменить следующую строку (сообщение syslog) на следующую строку.

Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40 
==> 
{ 'timestamp': 'Feb 19 18:45:29', 
    'host': 'SD550', 0x1000:10, 0x1001:20, 0x1002:30, 0x1003:40 } 

В журнале сообщений, «0x1000» представляет собой адрес регистра, начиная, «4» представляет собой число значений регистров, и последующие значения являются лишь значение. Таким образом, это означает, что 0x1000: 10, 0x1001: 20, 0x1002: 30, 0x1003: 40. Важным моментом является то, что количество регистровых значений может меняться. В результате длина сообщения журнала может быть переменной. Несмотря на то, что он имеет какую-то длину, я хотел бы получить правильный результат. (например, 0x2000,2,12,22 ==> 0x2000: 12, 0x2001: 22)

Это мой незавершенный файл конфигурации для logstash. Я нашел некоторые фильтры, такие как grok, mutate и extractnumbers. Но я не знаю, как делать то, что я хочу делать.

input { 
    file { 
     path => "/var/log/syslog" 
     type => "syslog" 
    } 
} 

filter { 
    ??? 
} 

output { 
    elasticsearch { } 
} 

Я знаю, что хочу многого, извините, ребята. Кроме того, моя конечная цель - нарисовать график TIME (x)/VALUE (y) для конкретного регистра в кибане. Является ли это возможным? Могу ли я получить от вас некоторые советы?

Спасибо, Youngmin Ким

ответ

0

Спасибо всем, кто отвечает на мой вопрос .. В частности, Бен Лим.

С вашей помощью я получил этот результат.

{ 
     "@version" => "1", 
    "@timestamp" => "2014-02-20T11:07:28.125Z", 
      "type" => "syslog", 
      "host" => "ymkim-SD550", 
      "path" => "/var/log/syslog", 
      "ts" => "Feb 20 21:07:27", 
      "user" => "ymkim", 
      "func" => "REG", 
      "8192" => 16, 
      "8193" => 32, 
      "8194" => 17, 
      "8195" => 109 
} 

из $ logger REG,2000,4,10,20,11,6d

Это мой конфигурационный файл.

input { 
    file { 
     path => "/var/log/syslog" 
     type => "syslog" 
    } 
} 

filter { 
    grok { 
     match => ["message", "%{SYSLOGTIMESTAMP:ts} %{SYSLOGHOST:hostname} %{WORD:user}: %{WORD:func},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"] 
    } 

    if [func] == "REG" { 
     modbus_csv { 
      start_address => "address" 
      num_register => "regNumber" 
      source => "regValue" 
      remove_field => ["regValue", "hostname", "message", 
       "address", "regNumber"] 
     } 
    } 

} 

output { 
    stdout { debug => true } 
    elasticsearch { } 
} 

и модифицированный фильтр csv, названный modbus_csv.rb.

# encoding: utf-8 
require "logstash/filters/base" 
require "logstash/namespace" 

require "csv" 

# CSV filter. Takes an event field containing CSV data, parses it, 
# and stores it as individual fields (can optionally specify the names). 
class LogStash::Filters::MODBUS_CSV < LogStash::Filters::Base 
    config_name "modbus_csv" 
    milestone 2 

    # The CSV data in the value of the source field will be expanded into a 
    # datastructure. 
    config :source, :validate => :string, :default => "message" 

    # Define a list of column names (in the order they appear in the CSV, 
    # as if it were a header line). If this is not specified or there 
    # are not enough columns specified, the default column name is "columnX" 
    # (where X is the field number, starting from 1). 
    config :columns, :validate => :array, :default => [] 
    config :start_address, :validate => :string, :default => "0" 
    config :num_register, :validate => :string, :default => "0" 

    # Define the column separator value. If this is not specified the default 
    # is a comma ','. 
    # Optional. 
    config :separator, :validate => :string, :default => "," 

    # Define the character used to quote CSV fields. If this is not specified 
    # the default is a double quote '"'. 
    # Optional. 
    config :quote_char, :validate => :string, :default => '"' 

    # Define target for placing the data. 
    # Defaults to writing to the root of the event. 
    config :target, :validate => :string 

    public 
    def register 

    # Nothing to do here 

    end # def register 

    public 
    def filter(event) 
    return unless filter?(event) 

    @logger.debug("Running modbus_csv filter", :event => event) 

    matches = 0 

    @logger.debug(event[@num_register].hex) 
    for i in 0..(event[@num_register].hex) 
     @columns[i] = event[@start_address].hex + i 
    end 
    if event[@source] 
     if event[@source].is_a?(String) 
     event[@source] = [event[@source]] 
     end 

     if event[@source].length > 1 
     @logger.warn("modbus_csv filter only works on fields of length 1", 
        :source => @source, :value => event[@source], 
        :event => event) 
     return 
     end 

     raw = event[@source].first 
     begin 
     values = CSV.parse_line(raw, :col_sep => @separator, :quote_char => @quote_char) 

     if @target.nil? 
      # Default is to write to the root of the event. 
      dest = event 
     else 
      dest = event[@target] ||= {} 
     end 

     values.each_index do |i| 
      field_name = @columns[i].to_s || "column#{i+1}" 
      dest[field_name] = values[i].hex 
     end 

     filter_matched(event) 
     rescue => e 
     event.tag "_modbus_csvparsefailure" 
     @logger.warn("Trouble parsing modbus_csv", :source => @source, :raw => raw, 
         :exception => e) 
     return 
     end # begin 
    end # if event 

    @logger.debug("Event after modbus_csv filter", :event => event) 

    end # def filter 

end # class LogStash::Filters::Csv 

Наконец-то я получил диаграмму, что хочу. (* func = REG (13) 4096 mean per 10m | (13 просмотров))

0

Вы хотите использовать ГРКИ, чтобы соответствовать различным полям, существует целый ряд встроенных в шаблонах ГРОК, которые помогут вам в этом. % {SYSLOGBASE} получит временную метку и хост для вас, а затем остальные могут быть захвачены такими шаблонами, как% {NUMBER} и другие, найденные по адресу https://github.com/logstash/logstash/blob/v1.3.3/patterns/grok-patterns

Из-за вашей переменной длины журнала ваши шаблоны могут получить немного сложный, однако я думаю, что вы можете уйти, просто сопоставив все числа и скопировав их в массив, а затем в своем мутате вы можете сопоставить их с значением регистра.

Что касается создания графика в кибане, это не будет очень сложно, как только ваши данные будут правильно отформатированы. Существует встроенный граф временного ряда, который легко заполнить.

0

19 февраля 18:45:29 SD550 Джек: REG, 0x1000,4,10,20,30,40

если использовать следующий конфигурационный файл на данных, который выглядит, как выше, и открытый kibana, вы это работает. Он разбивает поля на разные категории, на которые вы можете искать. Я новичок во всем этом, но я так и сделаю. Скриншот ниже, а простой временной круговой диаграммы после того, как я поставил около 8 строк выше с помощью другого времени и значением адреса

input { 
    tcp { 
    type => "test" 
    port => 3333 
    } 
} 


filter { 
    grok { 
     match => ["message", "%{MONTH:month} %{DAY:day} %{TIME:time} %{WORD:sd550} %{WORD:name}: %{WORD:asmThing},%{WORD:address},%{NUMBER:firstno}%{NUMBER:2no}%{NUMBER:3no}%{NUMBER:4no}%{NUMBER:5no}"] 
} 
} 
output { 
    elasticsearch { 
    # Setting 'embedded' will run a real elasticsearch server inside logstash. 
    # This option below saves you from having to run a separate process just 
    # for ElasticSearch, so you can get started quicker! 
    embedded => true 
    } 
} 

Test Kibana

+0

Длина файла регистра является переменной. Но ваш фильтр grok исправлен! Если имеется более 4 регистров, ваш фильтр не будет выполнен. –

+0

Я не знал, что это изменилось. Мой фильтр не будет работать для изменения, вы правы. – GPPK

0

У меня есть одна идея. Чтобы обрабатывать длину журнала журнала с несколькими адресами регистра: значение, , вы можете использовать фильтр grok для фильтрации сообщения. Затем используйте фильтр csv для разделения каждого значения регистра.

Фильтр:

filter { 
    grok { 
      match => ["message", "%{MONTH:month} %{NUMBER:day} %{TIME:time} %{WORD:host} %{WORD:user}: %{WORD:unit},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"] 
      add_field => ["logdate","%{month} %{day} %{time}"] 
      remove_field => ["month","day", "time"] 
    } 

    csv { 
      source => "regValue" 
      remove_field => ["regValue"] 
    } 
} 

Выход:

{ 
    "message" => "Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40", 
    "@version" => "1", 
"@timestamp" => "2014-02-20T02:05:53.608Z", 
     "host" => "SD550" 
     "user" => "Jack", 
     "unit" => "REG", 
    "address" => "0x1000", 
"regNumber" => "4", 
    "logdate" => "Feb 19 18:45:29", 
    "column1" => "10", 
    "column2" => "20", 
    "column3" => "30", 
    "column4" => "40" 
} 

Однако имя поля адреса задается CSV фильтра (Вы не можете дать имя поля CSV filter column, так как количество поля переменная). Если вы хотите выполнить свое требование, вам необходимо изменить csv filter.