2015-12-19 5 views
3

Я пытаюсь реализовать следующий рабочий процесс с весны интеграции:Spring Integration Cassandra сохранение рабочего процесса

  • 1) Опрос REST API
  • 2) хранить POJO в Cassandra кластера

Это мой первый попробуйте с помощью Spring Integration, так что я все еще немного перегружен массой информации из ссылки. После некоторых исследований я мог бы выполнить следующую работу.

  • 1) Опрос REST API
  • 2) Преобразование отображенный POJO JSON результат в строку
  • 3) сохранить строки в файл

Вот код:

@Configuration 
public class ConsulIntegrationConfig { 

    @InboundChannelAdapter(value = "consulHttp", poller = @Poller(maxMessagesPerPoll = "1", fixedDelay = "1000")) 
    public String consulAgentPoller() { 
     return ""; 
    } 

    @Bean 
    public MessageChannel consulHttp() { 
     return MessageChannels.direct("consulHttp").get(); 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "consulHttp") 
    MessageHandler consulAgentHandler() { 
     final HttpRequestExecutingMessageHandler handler = 
      new HttpRequestExecutingMessageHandler("http://localhost:8500/v1/agent/self"); 
     handler.setExpectedResponseType(AgentSelfResult.class); 
     handler.setOutputChannelName("consulAgentSelfChannel"); 
     LOG.info("Created bean'consulAgentHandler'"); 
     return handler; 
    } 

    @Bean 
    public MessageChannel consulAgentSelfChannel() { 
     return MessageChannels.direct("consulAgentSelfChannel").get(); 
    } 

    @Bean 
    public MessageChannel consulAgentSelfFileChannel() { 
     return MessageChannels.direct("consulAgentSelfFileChannel").get(); 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "consulAgentSelfFileChannel") 
    MessageHandler consulAgentFileHandler() { 
     final Expression directoryExpression = new SpelExpressionParser().parseExpression("'./'"); 
     final FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression); 
     handler.setFileNameGenerator(message -> "../../agent_self.txt"); 
     handler.setFileExistsMode(FileExistsMode.APPEND); 
     handler.setCharset("UTF-8"); 
     handler.setExpectReply(false); 
     return handler; 
    } 
} 

@Component 
public final class ConsulAgentTransformer { 

    @Transformer(inputChannel = "consulAgentSelfChannel", outputChannel = "consulAgentSelfFileChannel") 
    public String transform(final AgentSelfResult json) throws IOException { 
     final String result = new StringBuilder(json.toString()).append("\n").toString(); 
     return result; 
} 

Это прекрасно работает!

Но теперь вместо того, чтобы записывать объект в файл, я хочу сохранить его в кластере Cassandra с помощью spring-data-cassandra. Для этого я закомментирована обработчик файлов в конфигурационном файле, вернуть POJO в трансформаторе и создал следующие,:

@MessagingGateway(name = "consulCassandraGateway", defaultRequestChannel = "consulAgentSelfFileChannel") 
public interface CassandraStorageService { 

    @Gateway(requestChannel="consulAgentSelfFileChannel") 
    void store(AgentSelfResult agentSelfResult); 
} 

@Component 
public final class CassandraStorageServiceImpl implements CassandraStorageService { 

    @Override 
    public void store(AgentSelfResult agentSelfResult) { 
     //use spring-data-cassandra repository to store 
     LOG.info("Received 'AgentSelfResult': {} in Cassandra cluster..."); 
     LOG.info("Trying to store 'AgentSelfResult' in Cassandra cluster..."); 
} 
} 

Но это, кажется, неправильный подход, метод обслуживания никогда не срабатывает.

Так что мой вопрос в том, что было бы правильным подходом к моей утилизации? Должен ли я реализовывать интерфейс MessageHandler в моем сервисном компоненте и использовать @ServiceActivator в моей конфигурации. Или что-то не хватает в моем текущем «шлюзе-подходе» ?? Или, может быть, есть другое решение, что я не в состоянии видеть ..

Как упоминалось ранее, я новичок в SI, так что это может быть глупый вопрос ...

Тем не менее, спасибо много заранее!

ответ

0

Непонятно, как вы подключаетесь к фасоли CassandraStorageService.

У Spring Integration Cassandra Extension Project есть реализация обработчика сообщений.

Cassandra Sink in spring-cloud-stream-modules использует его с конфигурацией Java, поэтому вы можете использовать это в качестве примера.

+0

Да, я знаю. Кажется, я еще не понял шлюз. Другая идея заключалась в том, чтобы подключить услугу к трансформатору, но это не кажется мне чистым. Ссылки выглядят многообещающими, я проверю их. Спасибо, пока. –

0

Так что я, наконец, сделал это работу. Все, что мне нужно было сделать

@Component 
public final class CassandraStorageServiceImpl implements CassandraStorageService { 

    @ServiceActivator(inputChannel="consulAgentSelfFileChannel") 
    @Override 
    public void store(AgentSelfResult agentSelfResult) { 
     //use spring-data-cassandra repository to store 
     LOG.info("Received 'AgentSelfResult': {}..."); 
     LOG.info("Trying to store 'AgentSelfResult' in Cassandra cluster..."); 
    } 
} 

CassandraMessageHandler и весна-облако потоковым, казалось, в большие накладные расходы на мой прецеденту, и я не очень понимаю, до сих пор ... И с этим решением, Я держу контроль над тем, что происходит в моем весеннем компоненте.