Я пытаюсь использовать Avro Serialize с Apache kafka для сериализации/десериализации сообщений. Я создаю один продюсер, который используется для сериализации сообщения определенного типа и отправки его в очередь. Когда сообщение успешно отправляется в очередь, наш потребитель выбирает сообщение и пытается обработать, но при попытке мы сталкиваемся с исключением, для байтов для конкретного объекта. Исключение составляет, как показано ниже:Apache Kafka Avro Deserialization: невозможно десериализовать или декодировать сообщение определенного типа.
[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Согласно исключением, мы используем некоторые inconenient путь для чтения данных, ниже наш код:
Кафка Производитель Код:
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public static void main(String[] args) throws InterruptedException, IOException {
Customer customer1 = new Customer(1002, "Jimmy");
Parser parser = new Parser();
Schema schema = parser.parse(AvroSpecificProducer.class
.getClassLoader().getResourceAsStream("avro/customer.avsc"));
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
try(ByteArrayOutputStream os = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
writer.write(customer1, encoder);
encoder.flush();
byte[] avroBytes = os.toByteArray();
ProducerRecord<String, byte[]> record1 = new ProducerRecord<>("CustomerSpecificCountry",
"Customer One 11 ", avroBytes
);
asyncSend(record1);
}
Thread.sleep(10000);
}
Кафка Потребитель Код:
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
}
public static void infiniteConsumer() throws IOException {
try(KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry"));
while(true) {
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100);
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + records.count());
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroSpecificDeserializer.class
.getClassLoader().getResourceAsStream("avro/customer.avsc"));
records.forEach(record -> {
DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema);
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
try {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
Customer customer = customerDatumReader.read(null, binaryDecoder);
System.out.println(customer);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
Использование потребителя в консоли, мы с успехом можем получить сообщение е. Итак, каков способ декодирования сообщения в наших файлах pojo?
Эй @Javier, первый думаю, что мой вопрос связан потребитель не с продюсером. Во-вторых: если вы посмотрите на пример, 'JavaSessionize.avro.LogLine' выглядит как класс avro, поэтому, возможно, они обрабатывают сериализацию для этого. В-третьих: Я использую преобразование определенного типа, а не общее преобразование. Avro поддерживает только 8 типов, в противном случае нам нужно определить полное преобразование схемы. –
@HarmeetSinghTaara Я предложил посмотреть на продюсера, потому что, если вы сериализуете что-то отличное от того, что вы ожидаете, потребитель потерпит неудачу. Я бы посоветовал использовать Conferent-krok-avro-console-consumer, чтобы прочитать это событие и посмотреть, похоже ли оно на то, что вы ожидаете. Если это не удается, тогда проблема не в вашем потребителе. –
@HarmeetSinghTaara 'LogLine' является результатом передачи [этой схемы avro] (https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/resources /avro/LogLine.avsc) через 'maven-avro-plugin', который автоматически генерирует классы Java. Если вы скомпилируете проект, вы можете взглянуть и сравнить его с собственным классом «Клиент». Я не думаю, что они делают сериализацию внутри 'LogLine', я уверен, что это просто POJO. –