2009-07-31 5 views
26

У меня есть классическая проблема потоковой передачи событий во входящую очередь второго потока. Только в этот раз меня очень интересует производительность. Чего я хочу достичь:Параллельная и блокирующая очередь в Java

  • Я хочу одновременный доступ к очереди, нажатие на устройство, приемник.
  • Когда очередь пуста, я хочу, чтобы потребитель блокировал очередь, ожидая производителя.

Моя первая идея состояла в том, чтобы использовать LinkedBlockingQueue, но вскоре я понял, что это не одновременно, а производительность пострадала. С другой стороны, теперь я использую ConcurrentLinkedQueue, но все же я оплачиваю wait()/notify() на каждую публикацию. Поскольку потребитель, обнаружив пустую очередь, не блокирует, мне нужно синхронизировать и wait() на блокировке. С другой стороны, производитель должен получить этот замок и notify() при каждой публикации. В целом, я оплачиваю sycnhronized (lock) {lock.notify()} в каждой отдельной публикации, даже если это не нужно.

Я думаю, что это необходимо здесь, это очередь, которая является блокирующей и параллельной. Я полагаю, что операция push() работает так же, как в ConcurrentLinkedQueue, с дополнительным notify() к объекту, когда выдвинутый элемент является первым в списке. Такая проверка, я считаю, уже существует в ConcurrentLinkedQueue, так как нажатие требует соединения со следующим элементом. Таким образом, это будет намного быстрее, чем синхронизация каждый раз на внешней блокировке.

Это что-то вроде этого доступно/разумно?

+0

Почему вы думаете, что java.util.concurrent.LinkedBlockingQueue не является параллельным? Я думаю, что он абсолютно параллелен, и видит его javadoc и исходный код. Но понятия не имею о производительности. – Rorick

+0

См. Также http://stackoverflow.com/questions/1301691/java-queue-implementations-which-one – Vadzim

ответ

11

Я думаю, вы можете придерживаться java.util.concurrent.LinkedBlockingQueue независимо от ваших сомнений. Это одновременно. Хотя, я понятия не имею о его производительности. Возможно, другая реализация BlockingQueue подойдет вам лучше. Их не так уж много, поэтому сделайте тесты производительности и измерения.

+0

Я говорю это только, наблюдая, как моя пропускная способность сокращается примерно в 8 раз по сравнению с ConcurrentLinkedQueue. Я догадался, что это внутреннее запирание всего, чтобы обеспечить безопасность потока. Вы правы, хотя это может быть и худшая производительность по сравнению с ConcurrentLinkedQueue. Какой вид бьет причину ;-) – Yiannis

+1

Какую бы вы ни использовали в очереди, вы собираетесь использовать блокировку для поддержки ожидающей новой записи. (если вы не заняты, подождите) Блокировка действительно не такая дорогостоящая (около 0,5 микросекунды для блокировки), поэтому, если ее проблема с производительностью может возникнуть с вашей конструкцией, например, создать меньше задач/найти способ добавить меньше объектов в очередь/пакет до вашей работы. –

+6

В качестве примечания, если вы хотите увеличить пропускную способность, используйте функцию drainTo() на LinkedBlcokingQueue, мы измерили почти 500% увеличенную пропускную способность в одном из наших приложений, вместо того, чтобы принимать() 'один и один элемент из очереди – nos

2

Адрес list of classes implementing BlockingQueue.

Я бы порекомендовал проверку SynchronousQueue.

Как @Rorick, упомянутый в его комментарии, я считаю, что все эти реализации являются параллельными. Я думаю, что ваши проблемы с LinkedBlockingQueue могут быть неуместными.

+1

SynchronousQueue не то, что я хочу, так как он будет блокировать моего продюсера каждый раз, когда он пытается опубликовать. – Yiannis

+0

SynchronousQueue больше похож на трубу, чем на очередь. Похоже, он не может содержать задачи, ожидающие обработки в нем, только «нажимать» отдельные задачи от производителя к потребителю. – Rorick

+0

Хехе, извините. – jjnguy

5

Я предлагаю вам посмотреть ThreadPoolExecutor newSingleThreadExecutor. Он будет обрабатывать ваши задачи, заказанные для вас, и если вы отправите Callables вашему исполнителю, вы также сможете получить поведение блокировки, которое вы ищете.

3

Я использую ArrayBlockingQueue всякий раз, когда мне нужно передавать данные из одного потока в другой. Использование методов put и take (которые будут блокироваться, если они полны/пусты).

4

Вы можете попробовать LinkedTransferQueue из jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/

Он отвечает вашим требованиям и имеют меньше накладных расходов на предложение/опроса операций. Как видно из кода, когда очередь не пуста, она использует атомарные операции для опроса элементов. И когда очередь пуста, она вращается в течение некоторого времени и припарковывает поток, если не удастся. Я думаю, что это может помочь в вашем случае.

6

Как и в этом ответе https://stackoverflow.com/a/1212515/1102730, но немного иначе .. Я закончил с использованием ExecutorService. Вы можете создать экземпляр, используя Executors.newSingleThreadExecutor(). Мне нужна параллельная очередь для чтения/записи BufferedImages в файлы, а также атомарности с чтением и записью. Мне нужен только один поток, потому что файл IO на порядок быстрее, чем источник, net IO. Кроме того, меня больше беспокоило атомарность действий и правильность, чем производительность, но этот подход также может быть выполнен с несколькими потоками в пуле, чтобы ускорить работу.

Чтобы получить изображение (Try-Уловка Наконец опущены):

Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() { 
    @Override 
     public BufferedImage call() throws Exception { 
      ImageInputStream is = new FileImageInputStream(file); 
      return ImageIO.read(is); 
     } 
    }) 

image = futureImage.get(); 

Чтобы сохранить изображение (Try-Уловка Наконец опущены):

Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() { 
    @Override 
    public Boolean call() { 
     FileOutputStream os = new FileOutputStream(file); 
     return ImageIO.write(image, getFileFormat(), os); 
    } 
}); 

boolean wasWritten = futureWrite.get(); 

Важно отметить, что вы должен очистить и закрыть ваши потоки в блоке finally. Я не знаю, как он работает по сравнению с другими решениями, но он довольно универсален.