Я новичок в весенней целой кафке, и я понимаю, что адаптер kafka-oubound-channel. Но есть ли способ, который я могу создавать темы, без необходимости настраивать в контексте xml?Имеет ли весна целочисленность kafka поддерживает динамическое создание темы
ie: на основе моего сообщения для трансформатора я бы хотел опубликовать сообщение для темы kafka, созданной для этого типа сообщения.
Update:
Ниже то, что я в конечном итоге делает. Будет приветствовать любое лучшее решение.
<int:channel id="inputForSolrPublish"></int:channel>
<int:service-activator input-channel="inputForSolrPublish"
ref="solrMasterListRouter" >
->
private void postMessageToMasterSpecifcTopics(final List<String> topicNames,
final String brokerList,
final Message<?> message) throws Exception {
for (String topicName : topicNames) {
createProducerContext(topicName,
brokerList).send(topicName,
message.getHeaders()
.get(KafkaHeaders.MESSAGE_KEY),
message);
}
}
private KafkaProducerContext<String, String> createProducerContext(final String topicName,
final String brokerList) throws Exception {
KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class);
AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class);
// Encoder<String> encoder = new
// org.springframework.integration.kafka.serializer.common.StringEncoder<String>();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName);
producerMetadata.setValueClassType(String.class);
producerMetadata.setKeyClassType(String.class);
producerMetadata.setValueEncoder(kafkaSpecificEncoder);
producerMetadata.setKeyEncoder(kafkaReflectionEncoder);
producerMetadata.setAsync(true);
Properties props = buildProducerConfigProperties();
ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata,
brokerList,
props);
ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata,
producer.getObject());
kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName,
config));
return kafkaProducerContext;
}
private Properties buildProducerConfigProperties() {
Properties props = new Properties();
props.put("topic.metadata.refresh.interval.ms",
"3600000");
props.put("message.send.max.retries",
"5");
props.put("tsend.buffer.bytes",
"5242880");
return props;
}
Привет @Harshjgs, Пожалуйста, скажите мне использование поля 'KafkaHeaders.MESSAGE_KEY'. Я не могу найти его цель. – rahul