2009-11-18 7 views
27

У меня есть блокирующая очередь объектов.java BlockingQueue не имеет блокировки заглянуть?

Я хочу написать поток, который блокируется до тех пор, пока в очереди не будет объекта. Подобно функциональности, предоставляемой BlockingQueue.take().

Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть() и не удалить объект. Я хочу удалить объект, только если я успешно обработаю его.

Итак, я хотел бы заблокировать функцию peek(). В настоящее время peek() просто возвращает, если очередь пуста в соответствии с javadocs.

Я что-то упустил? Есть ли другой способ достичь этой функциональности?

EDIT:

Любых мысли о том, если я использовал только безопасную очередь потоков и заглянул и спал вместо этого?

public void run() { 
    while (!__exit) { 
     while (__queue.size() != 0) { 
      Object o = __queue.peek(); 
      if (o != null) { 
       if (consume(o) == true) { 
        __queue.remove(); 
       } else { 
        Thread.sleep(10000); //need to backoff (60s) and try again 
       } 
      } 
     } 
     Thread.sleep(1000); //wait 1s for object on queue 
    } 
} 

Обратите внимание, что у меня есть только одна потребительская нить и одна (отдельная) нить производителя. Я думаю, это не так эффективно, как использование BlockingQueue ... Любые комментарии оценены.

ответ

14

Вы можете использовать LinkedBlockingDeque и физически удалить элемент из очереди (с помощью takeLast()), но снова заменить его на конец очереди если обработка терпит неудачу с помощью putLast(E e). Между тем ваши «продюсеры» добавят элементы к перед началом очереди, используя putFirst(E e).

Вы всегда можете инкапсулировать это поведение в пределах вашего собственного Queue реализации и обеспечить blockingPeek() метод, который выполняет takeLast() следуют putLast() за кулисами на подстилающей LinkedBlockingDeque. Следовательно, с точки зрения вызывающего клиента элемент никогда не удаляется из вашей очереди.

+2

Это хорошее предложение. Единственная проблема, которую я вижу здесь, заключается в том, что если очередь заполняется, пока я обрабатываю элемент, то я не буду в очереди возвращать текущий элемент. – rouble

+0

Вы можете обойти это, используя дополнительную синхронизацию в своей реализации обертки, поэтому сделайте операцию take + put атомарной. Вы также можете использовать неограниченную очередь. – Adamski

+2

Я бы рекомендовал не удалять и повторно добавлять, потому что тогда у вас возникли проблемы с отображением изменений состояния очереди в другие потоки. Может быть, использовать занятый опрос для реализации peek(). Или используйте семафор, привязанный к очереди в вашей обертке, если вы не хотите опроса. –

1

Не могли бы вы также добавить очередь слушателя событий в свою блокирующую очередь, а затем, когда что-то добавлено в (блокирующую) очередь, отправьте событие своим слушателям? Вы можете заблокировать поток, пока не будет вызван метод actionPerformed.

2

Единственное, что я знаю о том, что делает это BlockingBuffer в Apache Commons Collections:

Если либо получить или удалить вызывается пустой буфер, вызывающий поток ожидает уведомления о том, что добавить или addAll операция завершена.

get() эквивалентно peek() и Buffer можно сделать, чтобы действовать как BlockingQueue путем декорирования UnboundedFifoBuffer с BlockingBuffer

0

выглядит как сама BlockingQueue не имеет функциональность, в которой нужно указать.

Возможно, я попытаюсь немного заново сфотографировать проблему: что бы вы сделали с объектами, которые вы не можете «правильно обработать»? Если вы просто оставите их в очереди, вам придется в какой-то момент вытащить их и разобраться с ними. Я бы рекомендовал либо выяснить, как их обрабатывать (обычно, если queue.get() дает какое-либо недопустимое или плохое значение, вы, вероятно, можете просто отбросить его на пол) или выбрать другую структуру данных, чем FIFO.

1

Быстрый ответ: нет, на самом деле нет способа заблокировать просмотр, бар, реализующий блокирующую очередь с блокировкой peek() самостоятельно.

Я что-то не хватает?

заглядывать() может быть хлопотно с параллелизмом -

  • Если вы не можете обработать ваш заглянуть() 'd сообщение - оно будет оставаться в очереди, если у вас есть несколько потребителей.
  • Кто собирается получить этот объект из очереди, если вы не можете его обработать?
  • Если у вас несколько потребителей, вы получаете условие гонки между вами peek() и другим потоком, также обрабатывая элементы, что приводит к дублированию обработки или к худшему.

Похоже, вы могли бы быть лучше на самом деле удаление элемента и обработать его с помощью Chain-of-responsibility pattern

Edit: Re: последний пример: Если у вас есть только 1 потребитель, вы никогда не избавиться от объект в очереди - если он не обновляется в среднем времени - в этом случае вам лучше быть очень осторожным в отношении безопасности потоков и, вероятно, не должно поместить элемент в очередь в любом случае.

6

Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть() и не удалить объект. Я хочу удалить объект, только если я успешно обработаю его.

В целом, он не является потокобезопасным. Что делать, если после peek() и определить, что объект может быть успешно обработан, но перед тем, как вы удалите и обработаете take(), другой поток принимает этот объект?

+0

IMHO вы можете решить эту проблему, обернув и поместив некоторую синхронизацию. логика – yerlilbilgin