2017-02-10 13 views
2

Я использую Apache Kafka с Avro Serializer, используя определенный формат. Я пытаюсь создать свой собственный пользовательский класс и использовать его как значение сообщения kafka. Но когда я пытаюсь отправить сообщение, я получаю следующее исключение:Kafka Avro Сериализатор: org.apache.avro.AvroRuntimeException: не открыт

Exception in thread "main" org.apache.avro.AvroRuntimeException: not open 
    at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82) 
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287) 
    at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 

Мои Avro схемы файл, как показано ниже: Класс

{ 
    "namespace": "customer.avro", 
    "type": "record", 
    "name": "Customer", 
    "fields": [{ 
     "name": "id", 
     "type": "int" 
    }, { 
     "name": "name", 
     "type": "string" 
    }] 
} 

клиентов:

public class Customer { 
    public int id; 
    public String name; 

    public Customer() { 
    } 

    public Customer(int id, String name) { 
     this.id = id; 
     this.name = name; 
    } 

    public int getId() { 
     return id; 
    } 

    public void setId(int id) { 
     this.id = id; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 
} 

Данные Сериализация с использованием Avro :

public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) { 
     kafkaProducer.send(record); 
    } 

Customer customer1 = new Customer(1001, "James"); 

Parser parser = new Parser(); 
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro")); 

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer); 
dataFileWriter.append(customer1); 
dataFileWriter.close(); 

ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry", 
     "Customer One", dataFileWriter 
); 
fireAndForget(record1); 

Я хочу использовать SpecificDatumWriter писатель вместо общего. С чем связана эта ошибка?

ответ

1

Kafka получает пару значений ключа для сериализации, вы передаете ему DataFileWriter, который не является значением, которое вы хотите сериализовать, это не сработает.

Что вам нужно сделать, это создать массив с сериализованного AVRO через BinaryEncoder и ByteArrayOutputStream, а затем передать его в ProducerRecord<String, byte[]>:

SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
ByteArrayOutputStream os = new ByteArrayOutputStream(); 

try { 
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); 
    writer.write(customer1, encoder); 
    e.flush(); 

    byte[] avroBytes = os.toByteArray(); 
    ProducerRecord<String, byte[]> record1 = 
    new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes); 

    kafkaProducer.send(record1); 
} finally { 
    os.close(); 
} 
+0

Эй @Yuval, согласно этому решению, я получаю 'Исключение в потоке" main "java.lang.ClassCastException: com.harmeetsingh13.java.producers.avroserializer.Customer не может быть передан в org.apache.avro.generic.IndexedRecord' Exception –

+0

@HarmeetSinghTaara Ну, это еще одна проблема, я предлагаю вы отправляете другой вопрос, который включает определение «Клиент» и полный стек. Похоже, вы не внедряете интерфейс 'ISpecificRecord', который требуется для типов, которые вы хотите сериализовать. В противном случае вы можете использовать «GenericDatumWriter». –

+0

Okay @Yuval. Но в соответствии с этим решением нам нужно преобразовать наши данные в байты, и после этого мы отправим сообщение в kafka, не возможно ли указать нашу схему на avro, и avro автоматически обрабатывает все? –

 Смежные вопросы

  • Нет связанных вопросов^_^