1

Я использую следующий код (не совсем, но давайте предположим), чтобы создать схему и отправить ее кафке производителем.как отправить схему avro ТОЛЬКО один раз в kafka

public static final String USER_SCHEMA = "{" 
     + "\"type\":\"record\"," 
     + "\"name\":\"myrecord\"," 
     + "\"fields\":[" 
     + " { \"name\":\"str1\", \"type\":\"string\" }," 
     + " { \"name\":\"str2\", \"type\":\"string\" }," 
     + " { \"name\":\"int1\", \"type\":\"int\" }" 
     + "]}"; 

public static void main(String[] args) throws InterruptedException { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 

    Schema.Parser parser = new Schema.Parser(); 
    Schema schema = parser.parse(USER_SCHEMA); 
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); 

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); 

    for (int i = 0; i < 1000; i++) { 
     GenericData.Record avroRecord = new GenericData.Record(schema); 
     avroRecord.put("str1", "Str 1-" + i); 
     avroRecord.put("str2", "Str 2-" + i); 
     avroRecord.put("int1", i); 

     byte[] bytes = recordInjection.apply(avroRecord); 

     ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes); 
     producer.send(record); 

     Thread.sleep(250); 

    } 

    producer.close(); 
} 

Вещь - это код, позволяющий мне отправить только 1 сообщение с этой схемой. Затем мне нужно изменить имя схемы, чтобы отправить следующее сообщение ... поэтому строка имени является случайным образом сгенерирована прямо сейчас, поэтому я могу отправить больше сообщений. Это хак, поэтому я хотел бы знать правильный способ сделать это.

Я также посмотрел, как отправлять сообщения без схемы (т.е. уже отправлено 1 сообщение с схемой в kafka, теперь все остальные сообщения больше не нуждаются в схеме) - но new GenericData.Record(..) ожидает параметр схемы. Если он равен нулю, он выдает ошибку.

Итак, каков правильный способ отправки сообщений об астро-схемах в kafka?

Вот еще один пример кода - довольно идентичен моему:
https://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java

Он также не показывает, как отправить без установки схемы.

ответ

1

я не понял строку:

The thing is the code allows me to send only 1 message with this schema. Then I need to change the schema name in order to send the next message.

В обоих примерах, ваш и сливающийся пример вы прилагается, схема не передается Кафки.

В приведенном примере схема, используемая для создания объекта GenericRecord. Вы предоставляете схему, потому что вы хотите проверить запись на какой-либо схеме (например, подтвердите, что вы могли бы поместить целое int1-поле внутри объекта GenericRecord).

В вашем коде различие заключается в том, что вы решили сериализовать данные на байт [], что, вероятно, не требуется, поскольку вы можете делегировать эту ответственность KafkaAvroSerializer, как вы можете видеть в примере слияния.

GenericRecord - это объект Avro, это не исполнение Кафкой. Если вы хотите отправить какой-либо объект в Kafka (с помощью схемы или без него), вам просто нужно создать (или использовать выходной) сериализатор, который преобразует ваш объект в байт [], и установить этот сериализатор в свойствах, которые вы создаете для режиссер.

Обычно рекомендуется отправить указатель на схему с сообщением Avro. Вы можете найти обоснование для этого по следующим ссылкам: http://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/

+0

Спасибо, я буду перефразировать мой вопрос: я хочу написать от kafka до hdfs. Из того, что я видел, мне нужно иметь данные в кафке в формате avro, иначе hdfs ничего не пишет. Теперь я пытаюсь записать данные в кафку в формате avro. Я хочу зарегистрировать схему один раз (или вообще не) и отправить сообщения. У вас есть пример кода? Нужно ли сначала регистрировать схему с сервером схемы, прежде чем отправлять данные в кафку? Могу ли я отправлять данные в kafka в формате avro без добавления схемы в сообщении, как в приведенном выше коде. – Adrian

+0

Забыл упомянуть, что каждый раз, когда я отправляю сообщение, мне нужно изменить имя схемы, иначе я получаю сообщение об ошибке: 'Can not redefine: myrecord'. Это ошибка? – Adrian

+0

HDFS - это всего лишь файловая система. Вы выбираете, что и как вы записываете данные в HDFS. Как вы планировали переместить данные из Kafka в HDFS? У вас должен быть процесс, который будет отвечать за него. Возможно, процесс, который вы планировали использовать, предусматривает использование Avro, но это не является обязательным условием HDFS. Вы можете записывать данные в различных форматах. Здесь вы можете привести пример написания строк в Kafka: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example Позже вы можете использовать эти строки и записать их в HDFS с использованием API HDFS. –

 Смежные вопросы

  • Нет связанных вопросов^_^