Я использую Apache Kafka с Avro Serializer, используя определенный формат. Я пытаюсь создать свой собственный пользовательский класс и использовать его как значение сообщения kafka. Но когда я пытаюсь отправить сообщение, я получаю следующее исключение:Kafka Avro Сериализатор: org.apache.avro.AvroRuntimeException: не открыт
Exception in thread "main" org.apache.avro.AvroRuntimeException: not open
at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:287)
at com.harmeetsingh13.java.producers.avroserializer.AvroProducer.main(AvroProducer.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Мои Avro схемы файл, как показано ниже: Класс
{
"namespace": "customer.avro",
"type": "record",
"name": "Customer",
"fields": [{
"name": "id",
"type": "int"
}, {
"name": "name",
"type": "string"
}]
}
клиентов:
public class Customer {
public int id;
public String name;
public Customer() {
}
public Customer(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
Данные Сериализация с использованием Avro :
public static void fireAndForget(ProducerRecord<String, DataFileWriter> record) {
kafkaProducer.send(record);
}
Customer customer1 = new Customer(1001, "James");
Parser parser = new Parser();
Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("customer.avro"));
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
DataFileWriter<Customer> dataFileWriter = new DataFileWriter<>(writer);
dataFileWriter.append(customer1);
dataFileWriter.close();
ProducerRecord<String, DataFileWriter> record1 = new ProducerRecord<>("CustomerCountry",
"Customer One", dataFileWriter
);
fireAndForget(record1);
Я хочу использовать SpecificDatumWriter
писатель вместо общего. С чем связана эта ошибка?
Эй @Yuval, согласно этому решению, я получаю 'Исключение в потоке" main "java.lang.ClassCastException: com.harmeetsingh13.java.producers.avroserializer.Customer не может быть передан в org.apache.avro.generic.IndexedRecord' Exception –
@HarmeetSinghTaara Ну, это еще одна проблема, я предлагаю вы отправляете другой вопрос, который включает определение «Клиент» и полный стек. Похоже, вы не внедряете интерфейс 'ISpecificRecord', который требуется для типов, которые вы хотите сериализовать. В противном случае вы можете использовать «GenericDatumWriter». –
Okay @Yuval. Но в соответствии с этим решением нам нужно преобразовать наши данные в байты, и после этого мы отправим сообщение в kafka, не возможно ли указать нашу схему на avro, и avro автоматически обрабатывает все? –