2015-08-12 2 views
1

Есть работа, в которой есть список задач. Каждая задача имеет идентификатор, имя, статус.Весна Интеграция Java DSL - запуск нескольких активаторов службы async?

Я создал сервис активаторы для каждой задачи, а именно:

@ServiceActivator 
public Message<Task> execute(Message<Task> message){ 
    //do stuff 
} 

Я создал шлюз для задания и в потоке интеграции, начиная от шлюза:

@Bean 
    public IntegrationFlow startJob() { 
     return f -> f 
       .handle("jobService", "execute") 
       .channel("TaskRoutingChannel"); 
    } 

@Bean 
    public IntegrationFlow startJobTask() { 
     return IntegrationFlows.from("TaskRoutingChannel") 
       .handle("jobService", "executeTasks") 
       .route("headers['Destination-Channel']") 
       .get(); 
    } 

@Bean 
    public IntegrationFlow TaskFlow() { 
     return IntegrationFlows.from("testTaskChannel") 
       .handle("aTaskService", "execute") 
       .channel("TaskRoutingChannel") 
       .get(); 
    } 

@Bean 
    public IntegrationFlow TaskFlow2() { 
     return IntegrationFlows.from("test2TaskChannel") 
       .handle("bTaskService", "execute") 
       .channel("TaskRoutingChannel") 
       .get(); 
    } 

У меня есть задачи для выполнения последовательно, используя маршрутизаторы, как указано выше.

Однако мне нужно запустить задание, выполнить все его задачи параллельно. Я не мог понять, как это осуществить. Я попытался использовать @Async в методах активатора службы и сделать его возвратом void. но в этом случае, как я привяжу его обратно к каналу маршрутизации и запустим его следующую задачу? Пожалуйста, помогите. Благодарю.

EDIT:

Я использовал RecepientListRouter вместе с ExecutorChannel получить параллельное выполнение:

@Bean 
public IntegrationFlow startJobTask() { 
    return IntegrationFlows.from("TaskRoutingChannel") 
      .handle("jobService", "executeTasks") 
      .routeToRecipients(r -> r 
       .recipient("testTaskChannel") 
       .recipient("test2TaskChannel")) 
      .get(); 
} 

@Bean ExecutorChannel testTaskChannel(){ 
    return new ExecutorChannel(this.getAsyncExecutor()); 
} 

@Bean ExecutorChannel test2TaskChannel(){ 
    return new ExecutorChannel(this.getAsyncExecutor()); 
} 
@Bean 
public Executor getAsyncExecutor() { 
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    executor.setCorePoolSize(5); 
    executor.setMaxPoolSize(10); 
    executor.setQueueCapacity(10); 
    executor.initialize(); 
    return executor; 
} 

Теперь, 3 вопроса: 1) Если это хороший подход, как я отправлять конкретные части полезной нагрузки каждому каналу получателя. Предположим, что полезная нагрузка - это список <>, и я хочу отправить каждый элемент списка на каждый канал. 2) Как динамически установить канал получателя? сказать из заголовка? или список? 3) Это действительно хороший подход? Есть ли предпочтительный способ сделать это?

Спасибо заранее.

ответ

1

Ваш TaskRoutingChannel должен быть экземпляром ExecutorChannel. Например:

return f -> f 
      .handle("jobService", "execute") 
      .channel(c -> c.executor("TaskRoutingChannel", threadPoolTaskExecutor())); 

В противном случае, да: все вызывается с единственным Thread и это не хорошо для вашей задачи.

UPDATE

Позвольте мне попытаться ответить на ваши вопросы один за другим, хотя это звучит, как каждый из них должен как отдельные SO один :-).

  1. Если вам действительно нужно отправить одно сообщение нескольким услуг, вы можете использовать routeToRecipients, или может обратно к publishSubscribe. Или даже можно выполнить динамическую маршрутизацию, например, на основе header.

  2. Чтобы отправить часть сообщения на каждый канал есть только достаточно мест .split() перед вашим .routeToRecipients()

  3. Чтобы ответить на ваш последний вопрос, мне нужно знать бизнес-требования для выполнения этой задачи.

+0

См. Мое обновление также. –

+0

Спасибо за ответ. Я предполагаю, что я не указал в вопросе, что я хотел бы, чтобы отдельные задачи выполнялись асинхронно. TestTaskChannel и test2TaskChannel .. Это означало бы, что я должен сделать эти два канала ExecutorChannel – Ocelot

+0

. Также, пожалуйста, проверьте этот поток? - http://stackoverflow.com/q/31959480/1866983 – Ocelot