2015-03-12 3 views
4

Я написал штормовую топологию. Я в основном хочу отправить кортежи в схему avro в виде байтового массива в тему кафки.Отправить байт массив штурмовать kafka болт

Это, как я поставил болт:

builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>()) 
      .fieldsGrouping(BOLT1, new Fields("key")); 

И это, как я уверен, преобразование в массив байт

Schema schema = avroObject.getSchema(); 

     DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(ping, encoder); 
     encoder.flush(); 
     byte[] message = out.toByteArray(); 
     String key = new String(message, "UTF-8"); 

Когда я испускаю кортеж в следующем, как я не вижу ничего в Кафка тема (отправить поток байтов Кафке):

collector.emit(tuple, new Values(Obj.hashMD5(key), message)); 

, но вместо этого, если преобразовать массив байтов в строку, а затем Кафки Тема это работает:

Что-то, как показано ниже:

builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>()) 
      .fieldsGrouping(BOLT1, new Fields("key")); 

collector.emit(tuple, new Values(Obj.hashMD5(key), key)); 

Что я делаю неправильно? Как отправить поток байтов в тему кафки с помощью штормового кафка-болта?

+0

Пожалуйста, покажите своего продюсера кафки. – Shams

+0

Я использую болт Kafka, предоставленный штормом. См. Builder.setBolt (KAFKA_AVRO_BOLT_NAME, новый KafkaBolt ()) .fieldsGrouping (BOLT1, новые поля («ключ»)); в приведенном выше коде – user2942227

+0

Вы преобразовываете свой bytearray в строку java для создания своего ключа, вы, вероятно, пропустите данные, поскольку строка java не является строкой C. Вы проверили правильность своего ключевого значения? Потому что, если ваш хэшMD5 не будет ошибочным. Неужели это не работает? – zenbeni

ответ

4

У вас есть проблема, потому что ваш MD5 хэш неправилен:

Вы говорите, что если вы преобразовать ByteArray в Java String, это работает: это потому, что значение MD5 правильно в соответствии с String.

collector.emit(tuple, new Values(Obj.hashMD5(key), key)); 

Как вы можете видеть, что MD5 вычисляется на подстроки и вы пошлете строки, соответствующие MD5: все хорошо!

Но если вы отправляете bytearray, вам нужно вычислить MD5 на байтерах, и в результате это будет betearray, а не String. Код:

collector.emit(tuple, new Values(Obj.hashMD5(key), message)); 

неверен как MD5 не соответствует сообщению, но в преобразованном значении сообщения в кодировке UTF-8 в виде строки, которая с потерей данных (см. Ниже)

Вот ссылка на другой вопрос о SO правильно вычислить MD5 в ByteArray, формат:

How can I generate an MD5 hash?

Это происходит потому, что преобразование ByteArray в строку с потерями в Java (в отличие от C) и вы будут пропускать данные в процессе, так как некоторые байты не соответствуют символу в кодировке Java (у вас есть некоторые из них, по-видимому, в ваших данных).

Так что ваш KafkaBolt должен быть

KafkaBolt<byte[], byte[]> 

Я не знаю, достаточно отправить ByteArray MD5 вместе с ByteArray в Кафка шторма.Если это не так, вам придется использовать кодировку, которая без потерь между ByteArray и String Java, такие как BASE64:

Base64 Encoding in Java

Вам придется конвертировать ByteArray в строку base64, используя

KafkaBolt<String, String> 

, а затем передавать данные как обычно

collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64)); 

это также означает, что, когда вы запрашиваете данные из Кафки, это будет строка в Ь ase64, что вам придется декодировать, чтобы вернуть bytearray.

Надеюсь, что это поможет.

+2

Вам также необходимо убедиться, что для конфигурации установлено значение «serializer.class», «kafka.serializer.DefaultEncoder» для KafkaBolt.KAFKA_BROKER_PROPERTIES. Это значение по умолчанию в Kafka, но оно настроено на StringEncoder в примере KafkaBolt, проект кафки! – MrE