2

Я новичок в весенней целой кафке, и я понимаю, что адаптер kafka-oubound-channel. Но есть ли способ, который я могу создавать темы, без необходимости настраивать в контексте xml?Имеет ли весна целочисленность kafka поддерживает динамическое создание темы

ie: на основе моего сообщения для трансформатора я бы хотел опубликовать сообщение для темы kafka, созданной для этого типа сообщения.

Update:

Ниже то, что я в конечном итоге делает. Будет приветствовать любое лучшее решение.

<int:channel id="inputForSolrPublish"></int:channel> 

<int:service-activator input-channel="inputForSolrPublish" 
    ref="solrMasterListRouter" > 

->

private void postMessageToMasterSpecifcTopics(final List<String> topicNames, 
               final String brokerList, 
               final Message<?> message) throws Exception { 

    for (String topicName : topicNames) { 
     createProducerContext(topicName, 
           brokerList).send(topicName, 
               message.getHeaders() 
                 .get(KafkaHeaders.MESSAGE_KEY), 
               message); 

    } 

} 

private KafkaProducerContext<String, String> createProducerContext(final String topicName, 
                    final String brokerList) throws Exception { 
    KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>(); 
    AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class); 
    AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class); 
    // Encoder<String> encoder = new 
    // org.springframework.integration.kafka.serializer.common.StringEncoder<String>(); 

    ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName); 
    producerMetadata.setValueClassType(String.class); 
    producerMetadata.setKeyClassType(String.class); 
    producerMetadata.setValueEncoder(kafkaSpecificEncoder); 
    producerMetadata.setKeyEncoder(kafkaReflectionEncoder); 
    producerMetadata.setAsync(true); 

    Properties props = buildProducerConfigProperties(); 
    ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata, 
                          brokerList, 
                          props); 
    ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata, 
                          producer.getObject()); 
    kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName, 
                      config)); 
    return kafkaProducerContext; 
} 

private Properties buildProducerConfigProperties() { 
    Properties props = new Properties(); 
    props.put("topic.metadata.refresh.interval.ms", 
       "3600000"); 
    props.put("message.send.max.retries", 
       "5"); 
    props.put("tsend.buffer.bytes", 
       "5242880"); 
    return props; 

} 
+0

Привет @Harshjgs, Пожалуйста, скажите мне использование поля 'KafkaHeaders.MESSAGE_KEY'. Я не могу найти его цель. – rahul

ответ

2

Да, вы можете сделать это во время выполнения. См. TopicUtils.ensureTopicCreated.

Вы можете добавить его как <service-activator> как еще один абонент (первый) к <publish-subscribe-channel> для отправки сообщений. Нечто подобное:

<publish-subscribe-channel id="sendMessageToKafkaChannel"/> 

<service-activator input-channel="sendMessageToKafkaChannel" output-channel="nullChannel" order="1" 
    ref="creatTopicService" method="creatTopic"/> 

<int-kafka:outbound-channel-adapter channel="sendMessageToKafkaChannel" order="2"/> 

Принять в этом creatTopic всего сообщения и извлечь все необходимые параметры из сообщения или во время фазы впрыска, например, вводят ZookeeperConnect для извлечения getZkConnect() для первого zkAddressensureTopicCreated аргумент.

Но вы должны понимать, что у вас не может быть <int-kafka:message-driven-channel-adapter> без существующей темы на Kafka. Итак, я не уверен, как вы собираетесь общаться с сообщениями в динамически созданной теме в будущем. Хотя <int-kafka:inbound-channel-adapter> может работать на этот случай ...

+0

спасибо Aetem. Я закончил делать что-то очень похожее на ваше предложение. Дайте мне знать, что вы думаете. – Harshjags

+0

Имея конфигурацию '', вы можете добавить новую 'ProducerConfiguration' в существующую, используя ее' getProducerConfigurations() 'С другой стороны вы можете использовать единственную' 'и всегда отправлять сообщения в' '. Это потому, что вы используете в своем коде тот же самый «brokerList», а также все другие опции «ProducerConfiguration». –

+0

Thnks. разумно будет делать эти весенние очищения. и будет делать больше повторного использования компонентов. – Harshjags

 Смежные вопросы

  • Нет связанных вопросов^_^