3

У меня есть каталог SFTP и чтение файлов и отправка файлов для дальнейшей обработки в ServiceActivator. В любой момент мне нужно обрабатывать их параллельно с помощью обработчика.Нужно обрабатывать несколько файлов параллельно в Spring Integration

Вот моя интеграция с SPG java DSL.

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory()) 
         .temporaryFileSuffix("COPY") 
         .localDirectory(directory) 
         .deleteRemoteFiles(false) 
         .preserveTimestamp(true) 
         .remoteDirectory("remoteDir")) 
         .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5))) 
         .handle("mybean", "myMethod") 
         .handle(Files.outboundAdapter(new File("success")))   
         .deleteSourceFiles(true) 
         .autoCreateDirectory(true)) 
         .get(); 

Update: Вот мой ThreadPoolExecutor:

@Bean(name = "executor") 
public Executor getExecutor() 
{ 
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    executor.setCorePoolSize(4); 
    executor.setMaxPoolSize(4); 
    executor.setQueueCapacity(20);   
    executor.initialize(); 
    return executor; 
} 

ответ

1

Sftp.inboundAdapter() (SftpInboundFileSynchronizingMessageSource) возвращает удаленные файлы по одному в любом случае. Прежде всего, он синхронизирует их с локальным каталогом и только после этого опросит их для обработки сообщений как полезную нагрузку File.

Чтобы обрабатывать их параллельно, этого было бы достаточно, чтобы добавить taskExecutor в ваше определение e.poller(), и все эти maxMessagesPerPoll(5) будут распространяться по различным темам.

+0

Я попробовал эту опцию, я получил исключение. Вызвано: java.util.concurrent.RejectedExecutionException: Задача o[email protected]41c6ed7a отклонено из [email protected] [Запуск, размер пула = 4, активные потоки = 4, задачи в очереди = 20, завершенные задания = 1] Кроме того, выполнение выполняется очень медленно. Чем меньше один поток. – Harish

+0

Это не проблема 'TaskExecutor'. Ваш обработчик как-то медленный, может быть, где-то заблокирован. С другой стороны вы всегда можете использовать 'CallerRunsPolicy' вместо стандартного' AbortPolicy'. –

+0

Где мы добавляем CallerRunsPolicy – Harish