Я написал штормовую топологию. Я в основном хочу отправить кортежи в схему avro в виде байтового массива в тему кафки.Отправить байт массив штурмовать kafka болт
Это, как я поставил болт:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
И это, как я уверен, преобразование в массив байт
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
Когда я испускаю кортеж в следующем, как я не вижу ничего в Кафка тема (отправить поток байтов Кафке):
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
, но вместо этого, если преобразовать массив байтов в строку, а затем Кафки Тема это работает:
Что-то, как показано ниже:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
Что я делаю неправильно? Как отправить поток байтов в тему кафки с помощью штормового кафка-болта?
Пожалуйста, покажите своего продюсера кафки. – Shams
Я использую болт Kafka, предоставленный штормом. См. Builder.setBolt (KAFKA_AVRO_BOLT_NAME, новый KafkaBolt()) .fieldsGrouping (BOLT1, новые поля («ключ»)); в приведенном выше коде –
user2942227
Вы преобразовываете свой bytearray в строку java для создания своего ключа, вы, вероятно, пропустите данные, поскольку строка java не является строкой C. Вы проверили правильность своего ключевого значения? Потому что, если ваш хэшMD5 не будет ошибочным. Неужели это не работает? – zenbeni