Я пытаюсь записать данные в формате avro с моего кода Java в Kafka на HDFS с помощью соединителя HDFS kafka, и у меня возникают некоторые проблемы. Когда я использую простую схему и данные, представленную на сайте сливающихся платформ, я могу записать данные в HDFS, но когда я пытаюсь использовать сложную Avro схему, я получаю эту ошибку в журналах коннектора HDFS:Ошибка при записи в HDFS с использованием Kafka HDFS Connect
ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Did not find matching union field for data: PROD
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:973)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Я использую Confluent платформу 3.0.0
Мои Java-код:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", <url>);
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);
Schema schema = new Schema.Parser().parse(new FileInputStream("avsc/schema.avsc"));
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
InputStream input = new FileInputStream("json/data.json");
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
Object datum = null;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException e) {
break;
}
}
ProducerRecord<Object, Object> message = new ProducerRecord<Object, Object>(topic, datum);
producer.send(message);
producer.close();
схема (это создается из AVDL файла):
{
"type" : "record",
"name" : "RiskMeasureEvent",
"namespace" : "risk",
"fields" : [ {
"name" : "info",
"type" : {
"type" : "record",
"name" : "RiskMeasureInfo",
"fields" : [ {
"name" : "source",
"type" : {
"type" : "record",
"name" : "Source",
"fields" : [ {
"name" : "app",
"type" : {
"type" : "record",
"name" : "Application",
"fields" : [ {
"name" : "csi_id",
"type" : "string"
}, {
"name" : "name",
"type" : "string"
} ]
}
}, {
"name" : "env",
"type" : {
"type" : "record",
"name" : "Environment",
"fields" : [ {
"name" : "value",
"type" : [ {
"type" : "enum",
"name" : "EnvironmentConstants",
"symbols" : [ "DEV", "UAT", "PROD" ]
}, "string" ]
} ]
}
}, ...
Файл JSON:
{
"info": {
"source": {
"app": {
"csi_id": "123",
"name": "ABC"
},
"env": {
"value": {
"risk.EnvironmentConstants": "PROD"
}
}, ...
Вроде бы проблема со схемой, но я не могу определить проблему.
Привет, Джереми, спасибо за ваше исправление. Я загрузил последний код реестра схемы из вашего филиала. Поскольку он еще не включен в конфлюентный пакет, я загрузил код для apache kafka и kafka-hdfs-connect и создал их локально. При попытке запустить hdfs-коннектор он вызывает ошибку при загрузке файла AvroConverter (который находится в системном реестре). Могу ли я узнать, как настроить коннектор, чтобы он мог найти эту банку? – iiSGii