4

Мне недавно пришлось реализовать контроллер, который передает файлы с A на B. Существует около 8000 файлов с размером по 1-2 мб каждый.Реализация динамического контроллера передачи файлов с использованием ThreadPoolExecutor в Java

  • Если передача файла прошла успешно, создайте еще один поток. (в настоящее время увеличивать значение corePooleSize +1)
  • Если сбой одного файла невозможен, закройте одну попытку переноса потока. (в настоящее время увеличивайте corePooleSize -1)
  • Если сбой одного файла невозможен, не создавайте другой поток в течение определенного времени.

Идея заключается в том, чтобы получить максимально возможное количество соединений/лучшую скорость передачи, не зная ограничений хоста.

Теперь, мой вопрос, является ли ThreadPoolExecutor лучшим способом реализовать это поведение или есть лучший способ?

//Code simplified 
//add all files to callables with type Future<Boolean> 
while (true) { 

    // entry = get the first result that's done. 

    if (entry.getValue().get() == Boolean.TRUE) { 
     results.remove(entry.getKey()); 
     if (results.size() > threadPool.getCorePoolSize()) { 

      if (System.currentTimeMillis() >= nextAttempt) 
       resizeThreadPool(+1); 
     } 
    } else { 
     resizeThreadPool(-1); 
     nextAttempt = System.currentTimeMillis() + someTimeinMs; 
     entry.setValue(threadPool.submit(entry.getKey())); 
    } 
    if (results.isEmpty()) 
     return true; 
} 

Изменить: Существует минимальный & максимальное количество потоков данных в качестве параметра.

+0

Вам нужно полностью переосмыслить это. В ваших вычислениях нет ничего общего с «лучшей скоростью передачи», так что вы в конечном итоге создадите огромное количество потоков, все избивая и конкурируя друг с другом за процессор и сеть и не получая абсолютно никуда. Максимальное количество потоков, которое вы должны использовать, - это пропускная способность канала, деленная на целевую скорость передачи. Вы найдете это число довольно маленьким. – EJP

ответ

1

Это довольно интересная проблема в том, что у вас есть переменная производительность при передаче файлов (чтение/запись может происходить на локальном диске или в каком-то удаленном месте, а не все диски/сетевые местоположения имеют одинаково эффективную пропускную способность, доступную вообще раз). Я не уверен, что использование успеха/неудачи передачи является хорошим показателем для принятия решения о том, когда вы должны увеличить или уменьшить количество потоков. Я подозреваю, что сбои не произойдут из-за чрезмерной подготовки мест чтения/записи с потоками, они, вероятно, просто замедлятся, что приведет к некоторым дополнительным накладным расходам по сравнению с копированием одного файла за раз. В этом случае вы просто создадите больше потоков, пока не закончите работу с файлами, чтобы копировать или исчерпать память, в зависимости от того, что наступит раньше.

Это, я думаю, вам лучше подойти к проблеме под другим углом. Следующие ограничения в первую очередь, на мой взгляд:

  1. Вы должны прочитать каждый файл из где-то.
  2. Вы должны написать каждый файл до где-нибудь.
  3. Пункты чтения/записи произвольны.
  4. Перед записью вы не можете поместить каждый файл в ОЗУ.
  5. Вы не можете порождать бесконечное количество потоков (при попытке вы исчерпаете память).

С учетом этих ограничений я бы поддержал пару пулов потоков, один для чтения и один для записи, причем каждый пул опросил очередь запросов на чтение/запись. В оптимальном случае новый поток будет генерироваться с каждым новым обнаруженным местоположением чтения/записи, но ограниченным определенным размером, чтобы вы не превышали пределы вашей системной памяти (или некоторую предопределенную приемлемую сумму). Вы также хотите установить некоторый размер буфера, где, если размер файла превышает размер этого буфера, вы будете читать первые N байтов, а затем передайте эти байты в очередь записи перед чтением следующих N байтов. Таким образом, вы можете начать писать большие файлы до места назначения, прежде чем они будут прочитаны полностью, что позволит вам сохранить время и память, необходимые для чтения всего файла перед его записью. Наконец, вы можете ограничить размер очереди записи, чтобы ваша программа не читала больше данных, чем вы можете вписывать в распределение RAM вашей программы в любой момент времени.

+0

Спасибо за ваш ответ, и я тоже не был доволен своим решением. Я забыл упомянуть, что у меня есть параметры, один минимум и максимальный пул. К вашим пунктам 1. & 2 .: Все операции чтения и записи уже реализованы. Я не могу изменить их и просто получить их полный метод передачи как вызываемый. Все местоположения, которые я читаю/пишу, удалены, например. Google Cloud Storage, S3 ... Но также FTP и FTP-сервер я использую отклонения при каждом соединении выше 30. Я начинаю передачу с 25 потоками. Он увеличится до 31, затем сработает и продолжится с 30. – Tharon