Здесь у вас есть пример использования собственного сериализатора/десериализатора для значения сообщения Kafka. Для ключа сообщения Kafka это одно и то же.
Мы хотим отправить сериализованную версию MyMessage как значение Kafka и снова десериализовать ее в объект MyMessage со стороны потребителя.
Сериализация MyMessage на стороне производителя.
Вы должны создать класс, который реализует сериализатор org.apache.kafka.common.serialization.Serializer
Serialize() метода сделать работу, получая свой объект и возвращают упорядоченную версию как байты массив.
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}
try {
(serialize your MyMessage object into bytes)
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
@Override
public void close()
{
}
}
final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
Deserializing MyMessage в потребительской стороне.
Вы должны создать класс, который реализует десериализатор org.apache.kafka.common.serialization.Deserializer
десериализацию() метода сделать работу, получая сериализованное значение как байты массива и возвращают свой объект.
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}
try {
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
@Override
public void close()
{
}
}
Затем использовать его как это:
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {
int kafkaKey = record.key();
MyMessage kafkaValue = record.value();
...
}
Не могли бы вы уточнить, если этот вопрос для Кафки 0.9+ –