Я использую следующий код (не совсем, но давайте предположим), чтобы создать схему и отправить ее кафке производителем.как отправить схему avro ТОЛЬКО один раз в kafka
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
Вещь - это код, позволяющий мне отправить только 1 сообщение с этой схемой. Затем мне нужно изменить имя схемы, чтобы отправить следующее сообщение ... поэтому строка имени является случайным образом сгенерирована прямо сейчас, поэтому я могу отправить больше сообщений. Это хак, поэтому я хотел бы знать правильный способ сделать это.
Я также посмотрел, как отправлять сообщения без схемы (т.е. уже отправлено 1 сообщение с схемой в kafka, теперь все остальные сообщения больше не нуждаются в схеме) - но new GenericData.Record(..)
ожидает параметр схемы. Если он равен нулю, он выдает ошибку.
Итак, каков правильный способ отправки сообщений об астро-схемах в kafka?
Вот еще один пример кода - довольно идентичен моему:
https://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java
Он также не показывает, как отправить без установки схемы.
Спасибо, я буду перефразировать мой вопрос: я хочу написать от kafka до hdfs. Из того, что я видел, мне нужно иметь данные в кафке в формате avro, иначе hdfs ничего не пишет. Теперь я пытаюсь записать данные в кафку в формате avro. Я хочу зарегистрировать схему один раз (или вообще не) и отправить сообщения. У вас есть пример кода? Нужно ли сначала регистрировать схему с сервером схемы, прежде чем отправлять данные в кафку? Могу ли я отправлять данные в kafka в формате avro без добавления схемы в сообщении, как в приведенном выше коде. – Adrian
Забыл упомянуть, что каждый раз, когда я отправляю сообщение, мне нужно изменить имя схемы, иначе я получаю сообщение об ошибке: 'Can not redefine: myrecord'. Это ошибка? – Adrian
HDFS - это всего лишь файловая система. Вы выбираете, что и как вы записываете данные в HDFS. Как вы планировали переместить данные из Kafka в HDFS? У вас должен быть процесс, который будет отвечать за него. Возможно, процесс, который вы планировали использовать, предусматривает использование Avro, но это не является обязательным условием HDFS. Вы можете записывать данные в различных форматах. Здесь вы можете привести пример написания строк в Kafka: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example Позже вы можете использовать эти строки и записать их в HDFS с использованием API HDFS. –