1

Всякий раз, когда я пытаюсь прочитать сообщение из очереди kafka, я получаю следующее исключение:

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer 
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer 
     at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79) 
     at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 

Кафка Производитель Код:

public class AvroSpecificProducer { 
    private static Properties kafkaProps = new Properties(); 
    private static KafkaProducer<String, Customer> kafkaProducer; 

    static { 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
     kafkaProducer = new KafkaProducer<>(kafkaProps); 
    } 

    public static void fireAndForget(ProducerRecord<String, Customer> record) { 
     kafkaProducer.send(record); 
    } 

    public static void asyncSend(ProducerRecord<String, Customer> record) { 
     kafkaProducer.send(record, (recordMetaData, ex) -> { 
      System.out.println("Offset: "+ recordMetaData.offset()); 
      System.out.println("Topic: "+ recordMetaData.topic()); 
      System.out.println("Partition: "+ recordMetaData.partition()); 
      System.out.println("Timestamp: "+ recordMetaData.timestamp()); 
     }); 
    } 

    public static void main(String[] args) throws InterruptedException, IOException { 
     Customer customer1 = new Customer(1002, "Jimmy"); 
     ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry", 
       "Customer One 11 ", customer1 
     ); 

     asyncSend(record1); 

     Thread.sleep(1000); 
    } 
} 

Кафка потребительского кода:

public class AvroSpecificDeserializer { 

    private static Properties kafkaProps = new Properties(); 

    static { 
     kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
     kafkaProps.put("zookeeper.connect", "localhost:2181"); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    } 

    public static void infiniteConsumer() throws IOException { 
     VerifiableProperties properties = new VerifiableProperties(kafkaProps); 
     KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties); 
     KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties); 

     Map<String, Integer> topicCountMap = new HashMap<>(); 
     topicCountMap.put("NewTopic", 1); 

     ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps)); 
     Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); 

     KafkaStream stream = consumerMap.get("NewTopic").get(0); 
     ConsumerIterator it = stream.iterator(); 

     System.out.println("???????????????????????????????????????????????? "); 
     while (it.hasNext()) { 
      System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "); 
      MessageAndMetadata messageAndMetadata = it.next(); 
      String key = (String) messageAndMetadata.key(); 
      GenericRecord record = (GenericRecord) messageAndMetadata.message(); 
      Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record); 
      System.out.println("Key: " + key); 
      System.out.println("Value: " + customer); 
     } 

    } 

    public static void main(String[] args) throws IOException { 
     infiniteConsumer(); 
    } 
} 

Я выполняю, этот пример s:

  1. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
  2. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java

ответ

1

Это окончательный код, который будет работать, после обсуждения с @harmeen

static { 
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); 
    kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
    kafkaProps.put("zookeeper.connect", "localhost:2181"); 
    kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
} 

public static void infiniteConsumer() throws IOException { 

VerifiableProperties properties = new VerifiableProperties(kafkaProps); 
StringDecoder keyDecoder = new StringDecoder(properties); 
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties); 

Map<String, Integer> topicCountMap = new HashMap<>(); 
topicCountMap.put("BrandNewTopics", 1); 

ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps)); 
Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); 

KafkaStream stream = consumerMap.get("BrandNewTopics").get(0); 
ConsumerIterator it = stream.iterator(); 

while (it.hasNext()) { 
    MessageAndMetadata messageAndMetadata = it.next(); 
    String key = (String) messageAndMetadata.key(); 
    GenericRecord record = (GenericRecord) messageAndMetadata.message(); 
    Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record); 
    System.out.println("Key: " + key); 
    System.out.println("Value: " + customer); 
} 

Вещи, которые получили изменения:

  • Добавление SPECIFIC_AVRO_READER_CONFIG свойство true.
  • Использование наименьшее для начала с начала темы.
  • Использование StringSerializer и StringDeserializer для ключей.
  • Изменение как производителя, так и потребителя с учетом предыдущего изменения
  • Отрегулируйте пространство имен для класса Customer, которое представляет запись Avro.
+0

Если это не исправить, рассмотрите следующие шаги: 1. Убедитесь, что ваш реестр схем содержит схему для соответствующего объекта. 2. Используйте 'kafka-avro-console-consumer', чтобы использовать ваше событие. Это ограничило бы объем проблемы как вашим производителем, так и вашим потребителем. –

+0

Эй @ Javier, я использую './kafka-avro-console-consumer --bootstrap-server localhost: 2181 --topic CustomerSpecificCountry - from-begin --programty schema.registry.url = http: // localhost: 8081 'для выполнения потребителя, но ничего не получено этим потребителем, что-то не так с моей командой? –

+0

bootstrap-server должен указывать на одного или нескольких ваших брокеров kafka, а не на ваш узел zookeeper. –

 Смежные вопросы

  • Нет связанных вопросов^_^