2017-02-11 12 views
1

Я пытаюсь использовать Avro Serialize с Apache kafka для сериализации/десериализации сообщений. Я создаю один продюсер, который используется для сериализации сообщения определенного типа и отправки его в очередь. Когда сообщение успешно отправляется в очередь, наш потребитель выбирает сообщение и пытается обработать, но при попытке мы сталкиваемся с исключением, для байтов для конкретного объекта. Исключение составляет, как показано ниже:Apache Kafka Avro Deserialization: невозможно десериализовать или декодировать сообщение определенного типа.

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer 
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51) 
    at java.lang.Iterable.forEach(Iterable.java:75) 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46) 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

Согласно исключением, мы используем некоторые inconenient путь для чтения данных, ниже наш код:

Кафка Производитель Код:

static { 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
     kafkaProducer = new KafkaProducer<>(kafkaProps); 
    } 


public static void main(String[] args) throws InterruptedException, IOException { 
     Customer customer1 = new Customer(1002, "Jimmy"); 

     Parser parser = new Parser(); 
     Schema schema = parser.parse(AvroSpecificProducer.class 
       .getClassLoader().getResourceAsStream("avro/customer.avsc")); 

     SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
     try(ByteArrayOutputStream os = new ByteArrayOutputStream()) { 
      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); 
      writer.write(customer1, encoder); 
      encoder.flush(); 

      byte[] avroBytes = os.toByteArray(); 

      ProducerRecord<String, byte[]> record1 = new ProducerRecord<>("CustomerSpecificCountry", 
        "Customer One 11 ", avroBytes 
      ); 

      asyncSend(record1); 
     } 

     Thread.sleep(10000); 
    } 

Кафка Потребитель Код:

static { 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    } 

    public static void infiniteConsumer() throws IOException { 
     try(KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) { 
      kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry")); 

      while(true) { 
       ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100); 
       System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + records.count()); 

       Schema.Parser parser = new Schema.Parser(); 
       Schema schema = parser.parse(AvroSpecificDeserializer.class 
         .getClassLoader().getResourceAsStream("avro/customer.avsc")); 

       records.forEach(record -> { 
        DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema); 
        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null); 
        try { 
         System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); 
         Customer customer = customerDatumReader.read(null, binaryDecoder); 
         System.out.println(customer); 
        } catch (IOException e) { 
         e.printStackTrace(); 
        } 
       }); 
      } 

     } 
    } 

Использование потребителя в консоли, мы с успехом можем получить сообщение е. Итак, каков способ декодирования сообщения в наших файлах pojo?

ответ

0

Решение этой проблемы, используйте

DatumReader<GenericRecord> customerDatumReader = new SpecificDatumReader<>(schema); 

вместо

`DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema); 

Точная причина этого, до сих пор не найдено. Возможно, потому что Kafka не знает о структуре сообщения, мы явно определяем схему для сообщения, а GenericRecord полезно для преобразования любого сообщения в читаемый формат JSON в соответствии с схемой. После создания JSON мы можем легко преобразовать его в наш класс POJO.

Но все же вам необходимо найти решение для конвертации непосредственно в наш класс POJO.

0

Вам не нужно явно выполнять сериализацию Avro, прежде чем передавать значения в ProduceRecord. Сериализатор сделает это за вас. Ваш код будет выглядеть следующим образом:

Customer customer1 = new Customer(1002, "Jimmy"); 
ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry", customer1); 
    asyncSend(record1); 
} 

Смотрите пример из Confluent для simple producer using avro

+0

Эй @Javier, первый думаю, что мой вопрос связан потребитель не с продюсером. Во-вторых: если вы посмотрите на пример, 'JavaSessionize.avro.LogLine' выглядит как класс avro, поэтому, возможно, они обрабатывают сериализацию для этого. В-третьих: Я использую преобразование определенного типа, а не общее преобразование. Avro поддерживает только 8 типов, в противном случае нам нужно определить полное преобразование схемы. –

+0

@HarmeetSinghTaara Я предложил посмотреть на продюсера, потому что, если вы сериализуете что-то отличное от того, что вы ожидаете, потребитель потерпит неудачу. Я бы посоветовал использовать Conferent-krok-avro-console-consumer, чтобы прочитать это событие и посмотреть, похоже ли оно на то, что вы ожидаете. Если это не удается, тогда проблема не в вашем потребителе. –

+0

@HarmeetSinghTaara 'LogLine' является результатом передачи [этой схемы avro] (https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/resources /avro/LogLine.avsc) через 'maven-avro-plugin', который автоматически генерирует классы Java. Если вы скомпилируете проект, вы можете взглянуть и сравнить его с собственным классом «Клиент». Я не думаю, что они делают сериализацию внутри 'LogLine', я уверен, что это просто POJO. –