Спасибо всем, кто отвечает на мой вопрос .. В частности, Бен Лим.
С вашей помощью я получил этот результат.
{
"@version" => "1",
"@timestamp" => "2014-02-20T11:07:28.125Z",
"type" => "syslog",
"host" => "ymkim-SD550",
"path" => "/var/log/syslog",
"ts" => "Feb 20 21:07:27",
"user" => "ymkim",
"func" => "REG",
"8192" => 16,
"8193" => 32,
"8194" => 17,
"8195" => 109
}
из $ logger REG,2000,4,10,20,11,6d
Это мой конфигурационный файл.
input {
file {
path => "/var/log/syslog"
type => "syslog"
}
}
filter {
grok {
match => ["message", "%{SYSLOGTIMESTAMP:ts} %{SYSLOGHOST:hostname} %{WORD:user}: %{WORD:func},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"]
}
if [func] == "REG" {
modbus_csv {
start_address => "address"
num_register => "regNumber"
source => "regValue"
remove_field => ["regValue", "hostname", "message",
"address", "regNumber"]
}
}
}
output {
stdout { debug => true }
elasticsearch { }
}
и модифицированный фильтр csv, названный modbus_csv.rb.
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "csv"
# CSV filter. Takes an event field containing CSV data, parses it,
# and stores it as individual fields (can optionally specify the names).
class LogStash::Filters::MODBUS_CSV < LogStash::Filters::Base
config_name "modbus_csv"
milestone 2
# The CSV data in the value of the source field will be expanded into a
# datastructure.
config :source, :validate => :string, :default => "message"
# Define a list of column names (in the order they appear in the CSV,
# as if it were a header line). If this is not specified or there
# are not enough columns specified, the default column name is "columnX"
# (where X is the field number, starting from 1).
config :columns, :validate => :array, :default => []
config :start_address, :validate => :string, :default => "0"
config :num_register, :validate => :string, :default => "0"
# Define the column separator value. If this is not specified the default
# is a comma ','.
# Optional.
config :separator, :validate => :string, :default => ","
# Define the character used to quote CSV fields. If this is not specified
# the default is a double quote '"'.
# Optional.
config :quote_char, :validate => :string, :default => '"'
# Define target for placing the data.
# Defaults to writing to the root of the event.
config :target, :validate => :string
public
def register
# Nothing to do here
end # def register
public
def filter(event)
return unless filter?(event)
@logger.debug("Running modbus_csv filter", :event => event)
matches = 0
@logger.debug(event[@num_register].hex)
for i in 0..(event[@num_register].hex)
@columns[i] = event[@start_address].hex + i
end
if event[@source]
if event[@source].is_a?(String)
event[@source] = [event[@source]]
end
if event[@source].length > 1
@logger.warn("modbus_csv filter only works on fields of length 1",
:source => @source, :value => event[@source],
:event => event)
return
end
raw = event[@source].first
begin
values = CSV.parse_line(raw, :col_sep => @separator, :quote_char => @quote_char)
if @target.nil?
# Default is to write to the root of the event.
dest = event
else
dest = event[@target] ||= {}
end
values.each_index do |i|
field_name = @columns[i].to_s || "column#{i+1}"
dest[field_name] = values[i].hex
end
filter_matched(event)
rescue => e
event.tag "_modbus_csvparsefailure"
@logger.warn("Trouble parsing modbus_csv", :source => @source, :raw => raw,
:exception => e)
return
end # begin
end # if event
@logger.debug("Event after modbus_csv filter", :event => event)
end # def filter
end # class LogStash::Filters::Csv
Наконец-то я получил диаграмму, что хочу. (* func = REG (13) 4096 mean per 10m | (13 просмотров))
Длина файла регистра является переменной. Но ваш фильтр grok исправлен! Если имеется более 4 регистров, ваш фильтр не будет выполнен. –
Я не знал, что это изменилось. Мой фильтр не будет работать для изменения, вы правы. – GPPK