2015-10-28 5 views
3

У меня есть файл данных с тысячами строк. Я читаю их и сохраняю их в базе данных. Я хочу многопоточность этого процесса в партиях, например, 50 строк. Когда я прочитал в файле, 10 строк отправляются в ExecutorService.java Многопоточность - подача газа в ExecutorService

ExecutorService executor = Executors.newFixedThreadPool(5);` 

я могу сделать ниже в в время цикла до моих строк конца ....

Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime); 

Но я не хочу, чтобы прочитать весь файл в память, если обработка 10 строки занимают время. Я только хочу представить 5, пока один из потоков не вернется, а затем отправлю следующее.

Давайте предположим, что нить занимает 20 секунд, чтобы сохранить 10 записей, я не хочу, чтобы ExecutorService кормить тысячи линий, поскольку процесс чтения продолжает читать и представить ExecutorService

Что является лучшим способ достичь этого?

+0

Возможный дубликат http://stackoverflow.com/questions/1250643/how-to-wait-for-all-threads-to-finish-using-executorservice – Cratylus

+0

@Cratylus это, безусловно, не дубликат вопроса, который вы связаны между собой. ОП задает вопрос о том, как уменьшить количество поставленных задач, чтобы избежать необходимости читать огромный файл сразу, а не как знать, когда все задачи будут завершены. – CodeBlind

ответ

2

Вы можете сделать это с помощью LinkedList<Future<?>>, который хранит фьючерсы, пока вы не достигнете определенного размера. Вот некоторые скелет кода, который вы должны получить большую часть пути туда:

int threads = 5; 
ExecutorService service = Executors.newFixedThreadPool(threads); 
LinkedList<Future<?>> futures = new LinkedList<>(); 

//As long as there are rows to save: 
while(moreRowsLeft()){ 
    //dump another callable onto the queue: 
    futures.addLast(service.submit(new RowSavingCallable()); 

    //if the queue is "full", wait for the next one to finish before 
    //reading any more of the file: 
    while(futures.size() >= 2*threads) futures.removeFirst().get(); 
} 

//All rows have been submitted but some may still be writing to the DB: 
for(Future<?> f : futures) future.get(); 

//All rows have been saved at this point 

Вы можете спросить, почему я позволил количество фьючерсов достичь в два раза количество потоков на компьютере - это позволяет исполнителю услуг потоков для работать над сохранением базы данных, в то время как основной поток создает больше работы. Это может помочь скрыть затраты на ввод-вывод, связанные с получением большего количества доступных для обработки вызовов, в то время как рабочие потоки заняты записью базы данных.

+0

@ CodeBlind- Спасибо! У меня есть вопрос. Не следует начинать удаление из первого элемента LinkList. Первый, который был добавлен в LinkedList, имел бы более высокий шанс вернуться первым после завершения задачи? Можем ли мы подключить ExecutorService.take() к вашему алгоритму и оптимизировать? – Giovanny

+0

@ Гиованный, да, ты прав, опечатка с моей стороны :) Я исправил это. Что касается подключения «ExecutorCompletionService.take()» - кажется мне разумным. Вы можете просто использовать счетчик, чтобы отслеживать, сколько вы отправили, и называть 'take()', когда вы превысили некоторый порог. – CodeBlind

+0

Последний цикл вызовет «get» на фьючерсах, которые мы уже назвали «get». Это не будет делать никакого дополнительного процесса? – Giovanny