2015-02-28 5 views
1

У меня есть следующие: Источник - Кафка тема (транс) канал - память Раковина - HDFS (avro_event)Кафка с Avro записей

Данные в этой теме Кафка транс был написан с использованием переменного тока # производитель и имеет тысячи записей об Avro. Когда я запускаю своего потребителя дымохода, он начинает топить данные в hdf. Проблема заключается в том, что данные в формате: Схема данных схемы данные

вместо:

схема данных данные

Я предполагаю, что это происходит потому, что лотковый ожидает тип записи с {header} {body}, тогда как данные, поступающие из kafka, будут {body} . Я знаю, что вы можете обернуть данные avro, которые были записаны в тему в avroFlumeEvent, но тогда кажется, что это уже не истинный рекорд Avro и, возможно, искровой потребитель или шторм предпочтут t он данные в истинном avro вниз по линии. Есть ли способ обработать эту тему, чтобы данные записывались без использования нескольких схем каждый раз, когда лоток загружает данные в hdfs?

ответ

-1

Рассматривали ли вы использование Camus из LinkedIn, как только вы приземляете данные на kafka. Он выполнит задание mapreduce, но вы должны получить желаемый макет данных данных схемы. Вы также должны посмотреть на стеки kafka Confluence, в частности, на реестр схемы, который он предоставляет, и на остальные api, которые он предоставляет.

0

Мы действительно получили эту работу в конце. Мы использовали библиотеку microsoft .NET avro вместо apro-библиотеки apache в C# -производстве. Это означало, что запись avro была сериализована правильно. Мне также нужно было изменить поглотитель потока, чтобы использовать «org.apache.flume.sink.hdfs.AvroEventSerializer $ Builder» как сериализатор приемника вместо «avro_event». Мне также необходимо было включить перехватчик дымовых газов, подключенный к источнику кафки, который подталкивает переменную «flume.avro.schema.url» в заголовок флюма, который позднее будет использоваться сериализатором сеялки hdfs.

Я смотрел на camus, но это казалось излишним для того, что мы пытались реализовать, базового канала дымохода, связанного с темой кафки, которая поглощает данные avro для hdf.

я просто оторвал немного перехватчика от моего Java приложения, которое строит водовод конфигурации в надежде, что это может помочь другим, которые сталкиваются с этой проблемой:

   _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId +".interceptors",_interceptorId);   
       _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".type","static"); 
       _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".key","flume.avro.schema.url"); 
       _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".value",_avroProdSchemaLocation +_databaseName + "/" + _topic + "/record/" + _schemaVersion + "/" + _topicName + ".avsc");