2016-06-03 2 views
9

Я использую logstash jdbc, чтобы синхронизировать вещи между mysql и elasticsearch. Его работа отлично подходит для одного стола. Но теперь я хочу сделать это для нескольких таблиц. Мне нужно открыть несколько в терминаленесколько входов на logstash jdbc

logstash agent -f /Users/logstash/logstash-jdbc.conf 

каждый с запросом на выборку или же у нас есть лучший способ сделать это, чтобы мы могли иметь несколько таблиц обновляется ли.

моего конфигурационный файл

input { 
    jdbc { 
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver" 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" 
    jdbc_user => "root" 
    jdbc_password => "password" 
    schedule => "* * * * *" 
    statement => "select * from table1" 
    } 
} 
output { 
    elasticsearch { 
     index => "testdb" 
     document_type => "table1" 
     document_id => "%{table_id}" 
     hosts => "localhost:9200" 
    } 
} 
+0

Вы можете иметь один конфиг с несколькими 'jdbc' ввода, а затем Параметризует' 'index' и document_type' в вашем' elasticsearch' выхода в зависимости от того, какой таблицы события откуда. – Val

+0

любой пример или образец у вас есть? – Autolycus

ответ

21

Вы можете определенно один конфиг с множественным jdbc вводом, а затем Параметризуют index и document_type в вашем elasticsearch выходе в зависимости от того, какой таблицы события откуда.

input { 
    jdbc { 
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver" 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" 
    jdbc_user => "root" 
    jdbc_password => "password" 
    schedule => "* * * * *" 
    statement => "select * from table1" 
    type => "table1" 
    } 
    jdbc { 
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver" 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" 
    jdbc_user => "root" 
    jdbc_password => "password" 
    schedule => "* * * * *" 
    statement => "select * from table2" 
    type => "table2" 
    } 
    # add more jdbc inputs to suit your needs 
} 
output { 
    elasticsearch { 
     index => "testdb" 
     document_type => "%{type}" # <- use the type from each input 
     hosts => "localhost:9200" 
    } 
} 
+0

hmm ... Я думаю, что проблема будет document_id => "% {table_id}", если я не могу автоматически генерировать уникальный document_id – Autolycus

+0

Можете ли вы подробно рассказать? У вас есть разные поля ID для таблицы, которую вы хотели бы использовать в качестве идентификатора документа? – Val

+0

да, поэтому у меня разные идентификаторы, и у меня есть elasticsearch create ID вместо того, чтобы пытаться использовать уже существующий mysql IDS – Autolycus

0

Это не будет создавать повторяющиеся данные. и совместимый logstash 6x.

# YOUR_DATABASE_NAME : test 
# FIRST_TABLE : place 
# SECOND_TABLE : things  
# SET_DATA_INDEX : test_index_1, test_index_2 

input { 
    jdbc { 
     # The path to our downloaded jdbc driver 
     jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar" 
     jdbc_driver_class => "com.mysql.jdbc.Driver" 
     # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME 
     jdbc_connection_string => "jdbc:mysql://localhost:3306/test" 
     # The user we wish to execute our statement as 
     jdbc_user => "root" 
     jdbc_password => "" 
     schedule => "* * * * *" 
     statement => "SELECT @slno:[email protected]+1 aut_es_1, es_qry_tbl.* FROM (SELECT * FROM `place`) es_qry_tbl, (SELECT @slno:=0) es_tbl" 
     type => "place" 
     add_field => { "queryFunctionName" => "getAllDataFromFirstTable" } 
     use_column_value => true 
     tracking_column => "aut_es_1" 
    } 

    jdbc { 
     # The path to our downloaded jdbc driver 
     jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar" 
     jdbc_driver_class => "com.mysql.jdbc.Driver" 
     # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME 
     jdbc_connection_string => "jdbc:mysql://localhost:3306/test" 
     # The user we wish to execute our statement as 
     jdbc_user => "root" 
     jdbc_password => "" 
     schedule => "* * * * *" 
     statement => "SELECT @slno:[email protected]+1 aut_es_2, es_qry_tbl.* FROM (SELECT * FROM `things`) es_qry_tbl, (SELECT @slno:=0) es_tbl" 
     type => "things" 
     add_field => { "queryFunctionName" => "getAllDataFromSecondTable" } 
     use_column_value => true 
     tracking_column => "aut_es_2" 
    } 
} 

# install uuid plugin 'bin/logstash-plugin install logstash-filter-uuid' 
# The uuid filter allows you to generate a UUID and add it as a field to each processed event. 

filter { 

    mutate { 
      add_field => { 
        "[@metadata][document_id]" => "%{aut_es_1}%{aut_es_2}" 
      } 
    } 

    uuid { 
     target => "uuid" 
     overwrite => true 
    }  
} 

output { 
    stdout {codec => rubydebug} 
    if [type] == "place" { 
     elasticsearch { 
      hosts => "localhost:9200" 
      index => "test_index_1_12" 
      #document_id => "%{aut_es_1}" 
      document_id => "%{[@metadata][document_id]}" 
     } 
    } 
    if [type] == "things" { 
     elasticsearch { 
      hosts => "localhost:9200" 
      index => "test_index_2_13" 
      document_id => "%{[@metadata][document_id]}" 
      # document_id => "%{aut_es_2}" 
      # you can set document_id . otherwise ES will genrate unique id. 
     } 
    } 
}