2016-12-21 10 views
0

В рассматривающих примерах, которые я вижу много это:Apache Flink чтение Avro байт [] от Кафки

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties); 

я вижу, что они здесь уже знают схему.

Я не знаю схему до тех пор, пока не прочитаю байт [] в общей записи , а затем получить схему. (Как он может меняться от записи к записи)

Может кто-то мне точку в FlinkKafkaConsumer08, который читает из byte[] в карты фильтр, так что я могу удалить некоторые ведущие биты, а затем загрузить byte[] в Generic Record?

ответ

0

я делаю что-то подобное (я использую 09 потребителя)

В вашем основном коде проходе в пользовательском десериализаторе:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
       parameterTool.getRequired("topic"), new MyDeserializationSchema<>(), 
       parameterTool.getProperties()); 

Обычай Десериализация схема считывает байты, выясняет схему и/или извлекает ее из реестра схем, десериализуется в GenericRecord и возвращает объект GenericRecord.

public class MyDeserializationSchema<T> implements DeserializationSchema<T> { 


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class; 

    @Override 
    public T deserialize(byte[] arg0) throws IOException { 
     //do your stuff here, strip off your bytes 
     //deserialize and create your GenericRecord 
     return (T) (myavroevent); 
    } 

    @Override 
    public boolean isEndOfStream(T nextElement) { 
     return false; 
    } 

    @Override 
    public TypeInformation<T> getProducedType() { 
     return TypeExtractor.getForClass(avrotype); 
    } 

} 
+0

Wow работает прямо из коробки. Спасибо, теперь очевидно, что я смотрю на это. – Don

0

При использовании реестра схемы Confluent, я считаю, что предпочтительным решением было бы использовать Avro serde предоставленную вырожденная. Таким образом, мы просто вызываем deserialize(), и разрешение последней версии используемой схемы Avro выполняется автоматически позади сцены, и никакие манипуляции с байтами не требуются.

Она сводится к тому, что-то вроде этого (пример кода в Скале, ява решение было бы очень похоже):

import io.confluent.kafka.serializers.KafkaAvroDeserializer 

... 

val valueDeserializer = new KafkaAvroDeserializer() 
valueDeserializer.configure(
    Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
    false) 

... 

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
         topic: String, partition: Int, offset: Long): KafkaKV = { 

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord] 
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord] 

    KafkaKV(key, value) 
    } 

... 

Этот метод требует, чтобы производитель сообщение также интегрирован с реестром схемы и публикует схема там. Это может быть сделано в очень похожим образом, как описано выше, с использованием Сливной-х KafkaAvroSerializer

Я отправил подробное объяснение здесь: How to integrate Flink with Confluent's schema registry