Я пытаюсь выполнить некоторую выборочную реализацию блокирующей очереди с массивом с фиксированной длиной массивов байтов. Я не удаляю опрошенные элементы, поэтому я скорректировал метод put для возврата массива байтов, чтобы он мог быть написан напрямую (поток производителя использует MappedByteBuffer для записи непосредственно в этот массив байтов). Я добавил метод «commitPut()», чтобы просто увеличить счетчики и установить массивы «lengths». (если несколько потоков будут писать, это могут быть проблемы параллелизма, но я знаю, что пишет только один поток).Ошибка блокировки очереди блокировки
Ниже приведено то, что у меня есть. Он работает, если я отлаживаю шаг за шагом, но если я «запустил», похоже, что он сталкивается с некоторыми проблемами с блокировкой. Я скопировал, разделил и скорректировал код ArrayBlockingQueue. Может ли кто-нибудь с лучшими знаниями посмотреть класс и рассказать мне, что я делаю неправильно, или как это сделать лучше (например, написать directy для буферизации и задать длину массива и счетчиков на том же шаге)?
public class ByteArrayBlockingQueue {
private final int[] lens; // array to valid lengths
private final byte[][] items; // array of byte arrays
private int takeIndex = 0;
private int putIndex = 0;
private int count = 0;
public volatile int polledLen = 0; // lenght of last polled byte array
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
public ByteArrayBlockingQueue(int capacity, int size, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new byte[capacity][size];
this.lens = new int[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public byte[] put() throws InterruptedException {
final byte[][] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
//insert(e, len);
return items[putIndex];
} finally {
lock.unlock();
}
}
public void commitPut(int lenBuf) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
lens[putIndex] = lenBuf;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public byte[] poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
final byte[][] items = this.items;
final int[] lens = this.lens;
byte[] e = items[takeIndex];
this.polledLen = lens[takeIndex];
//items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
}
Можете ли вы описать «проблемы с блокировкой», с которыми вы сталкиваетесь? – jop
Кажется, что «потоки производителя» блокируют «потребительскую нить». Я не совсем получаю эти блокировки, хотя они кажутся мне полезными (почти тот же принцип, что и другие встроенные блокирующие очереди). Если я пройду через отладчик, установив точки останова в «poll» и «put» - он работает. Я вижу, что растровые изображения (байты в массиве байтов) декодируются должным образом. Как только я «забегаю», он терпит неудачу (байты не могут быть декодированы). Если я попытаюсь отладить значение опроса, то наблюдатель выражений будет терпеть неудачу с некоторой ошибкой «дочернего обновления», поэтому я предполагаю, что это проблема блокировки. – hpet