2017-02-01 8 views
0

Требование заключается в установлении тем атрибут ниже во время выполнения без перезапуска сервера .Как мы можем достичь здесь. В настоящее время мы читаем значение из файла свойств, но здесь требуется перезагрузка сервера, чтобы отразить сделанные изменения.Передача динамического значения Кафке потребителей темы во время выполнения для сообщений приводится адаптер канала

пример: sample.properties (внутри каталога развертывания)

topic.list = topic1, topic2

и хотели бы потреблять от topic3 ​​в будущем без перезагрузки сервера.

ПРИМЕЧАНИЕ: найти что темы является окончательным переменной.

попытался прочитать ключ (topic.list) из пути к файловой системе (вне каталога развертывания), но не повезло.

Любое предложение.

<int-kafka:message-driven-channel-adapter 

       id="inAdapter" 
       channel="fromKafka" 
       connection-factory="connectionFactory" 
       key-decoder="kafkaKeyDecoder" 
       payload-decoder="kafkaDecoder"        
       topics="${topic.list}" 
       offset-manager="offsetManager"/> 
+0

Как часто вы меняете тему? всегда есть способ (динамическое создание канального адаптера в этом случае) немного долго, но для этого потребуется также внешнее вмешательство (может быть api-вызов для запуска действия) – iamiddy

+0

Не совсем часто. Однако это требование иметь гибкость. Будет здорово, вы можете направить меня в этом направлении. – sam

+0

попытайтесь понять эту концепцию здесь https://github.com/spring-projects/spring-integration-samples/tree/master/advanced/dynamic-ftp, тогда вы можете легко настроить ее в соответствии с вашим прецедентом, и я могу помочь если у вас есть последующие вопросы – iamiddy

ответ

1

Вы можете использовать DSL Java динамически добавлять адаптеры для дополнительных тем по запросу ...

@Autowired 
private IntegrationFlowContext flowContext; 

public void addAnotherListenerForTopics(String... topics) { 
    IntegrationFlow flow = 
     IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), topics)) 
      .channel("fromKafka") 
      .get(); 
    this.flowContext.registration(flow).register(); 
} 

и

bean.addAnotherListenerForTopics("added.new"); 

ПОМ:

<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-java-dsl</artifactId> 
    <version>1.2.1.RELEASE</version> 
</dependency> 

Примечание что если вы используете брокерский раздел assi gnment, новый контейнер нуждается в другом идентификаторе группы, чтобы избежать отмены существующих назначений.

+0

Не могли бы вы объяснить теорию этого решения более подробно для лучшего подчёркивания? –

+0

Я не уверен, что у вас возникли проблемы с пониманием; регистрация динамического потока включена в [это сообщение в блоге] (https://spring.io/blog/2016/09/27/java-dsl-for-spring-integration-1-2-release-candidate-1-is- доступный). Задайте новый вопрос, если вам нужно что-то еще. –

+0

Пожалуйста, ознакомьтесь с этим: https://stackoverflow.com/questions/46400329/how-to-pass-topics-dynamically-to-a-kafka-listener –