3

Существует только несколько сериализатора доступен как,Как создать собственный сериализатор в kafka?

org.apache.kafka.common.serialization.StringSerializer 
org.apache.kafka.common.serialization.StringSerializer 

Как мы можем создать собственный сериалайзер?

+0

Не могли бы вы уточнить, если этот вопрос для Кафки 0.9+ –

ответ

5

Здесь у вас есть пример использования собственного сериализатора/десериализатора для значения сообщения Kafka. Для ключа сообщения Kafka это одно и то же.

Мы хотим отправить сериализованную версию MyMessage как значение Kafka и снова десериализовать ее в объект MyMessage со стороны потребителя.

Сериализация MyMessage на стороне производителя.

Вы должны создать класс, который реализует сериализатор org.apache.kafka.common.serialization.Serializer

Serialize() метода сделать работу, получая свой объект и возвращают упорядоченную версию как байты массив.

public class MyValueSerializer implements Serializer<MyMessage> 
{ 
    private boolean isKey; 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) 
    { 
     this.isKey = isKey; 
    } 

    @Override 
    public byte[] serialize(String topic, MyMessage message) 
    { 
     if (message == null) { 
      return null; 
     } 

     try { 

      (serialize your MyMessage object into bytes) 

      return bytes; 

     } catch (IOException | RuntimeException e) { 
      throw new SerializationException("Error serializing value", e); 
     } 
    } 

    @Override 
    public void close() 
    { 

    } 
} 

final IntegerSerializer keySerializer = new IntegerSerializer(); 
final MyValueSerializer myValueSerializer = new MyValueSerializer(); 
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer); 

int messageNo = 1; 
int kafkaKey = messageNo; 
MyMessage kafkaValue = new MyMessage(); 
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue); 
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue)); 

Deserializing MyMessage в потребительской стороне.

Вы должны создать класс, который реализует десериализатор org.apache.kafka.common.serialization.Deserializer

десериализацию() метода сделать работу, получая сериализованное значение как байты массива и возвращают свой объект.

public class MyValueDeserializer implements Deserializer<MyMessage> 
{ 
    private boolean isKey; 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) 
    { 
     this.isKey = isKey; 
    } 

    @Override 
    public MyMessage deserialize(String s, byte[] value) 
    { 
     if (value == null) { 
      return null; 
     } 

     try { 

      (deserialize value into your MyMessage object) 

      MyMessage message = new MyMessage(); 
      return message; 

     } catch (IOException | RuntimeException e) { 
      throw new SerializationException("Error deserializing value", e); 
     } 
    } 

    @Override 
    public void close() 
    { 

    } 
} 

Затем использовать его как это:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer(); 
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer(); 
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer); 

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000); 
for (ConsumerRecord<Integer, MyMessage> record : records) { 

    int kafkaKey = record.key(); 
    MyMessage kafkaValue = record.value(); 

    ... 
} 
+0

окончательный KafkaConsumer потребитель = новый KafkaConsumer <> (реквизита, keyDeserializer, myValueDeserializer); –

+0

Выше сказано, что это не так, как в синтаксисе, тогда Как может kafka узнать о десериализаторе –

+1

Deserializer является третьим аргументом конструктора: myValueDeserializer. Весь этот код был взят из рабочего кода, просто изменил некоторые имена. –

0

Вы должны создать свой собственный сериализатор, который реализует интерфейс «Сериализатор» (org.apache.kafka.common.serialization.Serializer), а затем установите для него параметр «key.serializer/value.serializer» производителя.