Есть работа, в которой есть список задач. Каждая задача имеет идентификатор, имя, статус.Весна Интеграция Java DSL - запуск нескольких активаторов службы async?
Я создал сервис активаторы для каждой задачи, а именно:
@ServiceActivator
public Message<Task> execute(Message<Task> message){
//do stuff
}
Я создал шлюз для задания и в потоке интеграции, начиная от шлюза:
@Bean
public IntegrationFlow startJob() {
return f -> f
.handle("jobService", "execute")
.channel("TaskRoutingChannel");
}
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.route("headers['Destination-Channel']")
.get();
}
@Bean
public IntegrationFlow TaskFlow() {
return IntegrationFlows.from("testTaskChannel")
.handle("aTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
@Bean
public IntegrationFlow TaskFlow2() {
return IntegrationFlows.from("test2TaskChannel")
.handle("bTaskService", "execute")
.channel("TaskRoutingChannel")
.get();
}
У меня есть задачи для выполнения последовательно, используя маршрутизаторы, как указано выше.
Однако мне нужно запустить задание, выполнить все его задачи параллельно. Я не мог понять, как это осуществить. Я попытался использовать @Async
в методах активатора службы и сделать его возвратом void
. но в этом случае, как я привяжу его обратно к каналу маршрутизации и запустим его следующую задачу? Пожалуйста, помогите. Благодарю.
EDIT:
Я использовал RecepientListRouter вместе с ExecutorChannel получить параллельное выполнение:
@Bean
public IntegrationFlow startJobTask() {
return IntegrationFlows.from("TaskRoutingChannel")
.handle("jobService", "executeTasks")
.routeToRecipients(r -> r
.recipient("testTaskChannel")
.recipient("test2TaskChannel"))
.get();
}
@Bean ExecutorChannel testTaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean ExecutorChannel test2TaskChannel(){
return new ExecutorChannel(this.getAsyncExecutor());
}
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.initialize();
return executor;
}
Теперь, 3 вопроса: 1) Если это хороший подход, как я отправлять конкретные части полезной нагрузки каждому каналу получателя. Предположим, что полезная нагрузка - это список <>, и я хочу отправить каждый элемент списка на каждый канал. 2) Как динамически установить канал получателя? сказать из заголовка? или список? 3) Это действительно хороший подход? Есть ли предпочтительный способ сделать это?
Спасибо заранее.
См. Мое обновление также. –
Спасибо за ответ. Я предполагаю, что я не указал в вопросе, что я хотел бы, чтобы отдельные задачи выполнялись асинхронно. TestTaskChannel и test2TaskChannel .. Это означало бы, что я должен сделать эти два канала ExecutorChannel – Ocelot
. Также, пожалуйста, проверьте этот поток? - http://stackoverflow.com/q/31959480/1866983 – Ocelot