У меня есть каталог SFTP и чтение файлов и отправка файлов для дальнейшей обработки в ServiceActivator. В любой момент мне нужно обрабатывать их параллельно с помощью обработчика.Нужно обрабатывать несколько файлов параллельно в Spring Integration
Вот моя интеграция с SPG java DSL.
IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
.temporaryFileSuffix("COPY")
.localDirectory(directory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory("remoteDir"))
.patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
.handle("mybean", "myMethod")
.handle(Files.outboundAdapter(new File("success")))
.deleteSourceFiles(true)
.autoCreateDirectory(true))
.get();
Update: Вот мой ThreadPoolExecutor:
@Bean(name = "executor")
public Executor getExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(20);
executor.initialize();
return executor;
}
Я попробовал эту опцию, я получил исключение. Вызвано: java.util.concurrent.RejectedExecutionException: Задача o[email protected]41c6ed7a отклонено из [email protected] [Запуск, размер пула = 4, активные потоки = 4, задачи в очереди = 20, завершенные задания = 1] Кроме того, выполнение выполняется очень медленно. Чем меньше один поток. – Harish
Это не проблема 'TaskExecutor'. Ваш обработчик как-то медленный, может быть, где-то заблокирован. С другой стороны вы всегда можете использовать 'CallerRunsPolicy' вместо стандартного' AbortPolicy'. –
Где мы добавляем CallerRunsPolicy – Harish