2017-02-20 20 views
1

Я пытаюсь передать данные в базу данных MySQL, используя APACHE KAFKA и FLUME. (Вот мой конфигурационный файл желоб)Flume stream to mysql

agent.sources=kafkaSrc 
agent.channels=channel1 
agent.sinks=jdbcSink 

agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel 
agent.channels.channel1.brokerList=localhost:9092 
agent.channels.channel1.topic=kafkachannel 
agent.channels.channel1.zookeeperConnect=localhost:2181 
agent.channels.channel1.capacity=10000 
agent.channels.channel1.transactionCapacity=1000 


agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource 
agent.sources.kafkaSrc.channels = channel1 
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181 
agent.sources.kafkaSrc.topic = kafka-mysql 

***agent.sinks.jdbcSink.type = How to declare this?*** 
agent.sinks.jdbcSink.connectionString = jdbc:mysql://1.1.1.1:3306/test 
agent.sinks.jdbcSink.username=user 
agent.sinks.jdbcSink.password=password 
agent.sinks.jdbcSink.batchSize = 10 
agent.sinks.jdbcSink.channel =channel1 
agent.sinks.jdbcSink.sqlDialect=MYSQL 
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver 
agent.sinks.jdbcSink.sql=(${body:varchar}) 

Я знаю, как поток данных в Hadoop или HBase (тип регистратора или тип HDFS), однако не может найти тип потоковой передачи в БД MySQL. Итак, мой вопрос в том, как я объявляю jdbcSink.type?

+0

В коробке нет раковины JDBC. Вы не можете передавать данные в MySQL с помощью Flume. – franklinsijo

+0

@franklinsijo Спасибо за ваш ответ. Тогда можно ли вытащить данные из Кафки в РСУБД? Я открыт для любого предложения. – SLIT

+0

Невозможно ли с помощью [Kafka JDBC Sink] (http://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html)? – franklinsijo

ответ

0

Вы всегда можете создать пользовательскую раковину для MySQL. Это то, что мы сделали на FIWARE с помощью инструмента Cygnus.

Вы можете получить вдохновение от него: https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java

Он расширяет этот другой пользовательский базовый класс для всех наших моек: https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java

В принципе, вы должны расширить AbstractSink и реализовать интерфейс Configurable. Это означает, что переопределить аль крайней мере, следующие методы:

public Status process() throws EventDeliveryException 

и:

public void configure(Context context) 

соответственно.

+0

Большое вам спасибо. – SLIT