2015-03-04 3 views
2

Я программирую в java, и у меня есть List<LogEntry> log, который делится между разными потоками.Многие писатели один читатель без параллелизма

Эти «писатели» потоки уже синхронизированы между ними, так что только один поток в момент времени может добавлять или удалять элементы из log

Однако, из-распределенного алгоритма, который я пытаюсь реализовать, там являются частью журнала, которые являются «безопасными», что означает, что они не могут быть изменены ни писателями, ни читателем (которые я введю ниже). Эта часть log указывается полем int committedIndex, которое инициализируется до 0 и монотонно возрастает.

В заключение авторы изменить элементы в log в диапазоне (commitIndex,log.size()), в то время как есть читатель, который получают элементы в log, содержащихся в диапазоне [0,commitIndex]. Читатель начинает читать с первой записи, затем читает следующую, пока не достигнет log.get(commitIndex), затем останавливается и уходит спать до commitIndex. Он обновляет поле lastApplied, которое инициализируется 0 и монотонно возрастает, чтобы запомнить последние logEntry, которые он читал перед сном.

Как вы можете видеть, нет необходимости синхронизировать читателя и писателей, поскольку они имеют доступ к различным частям log.

Вопрос: как я могу «проснуться» от читающей нити, когда увеличивается commitIndex? Мне нужно что-то вроде этого (выполняемую писатель):

if(commitIndex is updated) 
{ 
    //wake up reader 
} 

и читатель:

public void run() { 
    while(true){ 
     //go to sleeep... 
     //now the reader is awaken! 
     while(lastApplied<commitIndex){ 
      //do something with log.get(lastApplied) 
      lastApplied++; 
     } 
    } 

Очевидно, что я очень упрощена мой код для того, чтобы дать вам понять, что я хочу, как лучше, возможно, извините, если это недостаточно ясно (и не стесняйтесь спрашивать меня об этом). Благодаря!

+0

Из [apidocs] (http://docs.oracle.com/javase/7/docs/api/java/util/ArrayList.html): «Если несколько потоков обращаются к списку связанных/массивов одновременно, и по крайней мере один из потоков изменяет список структурно, он должен быть синхронизирован извне». Обратите внимание на акцент на _must_: ваше предположение о том, что чтение с более низкого индекса при записи на более высокий индекс является полным, неверно. – vanOekel

+0

Список может быть изменен структурно только в разделе, который не используется Reader. Таким образом, нет никакого обмена между Reader и Writers, но только между Writers (которые действительно синхронизированы) – justHelloWorld

ответ

0

Используйте общий LinkedBlockingQueue<Integer> (среди читателей и всех авторов), чтобы каждый писатель сигнализировать читателя, что переменная commitIndex была изменена:

писателей:

if (commitIndex is updated) { 
    // wake up reader 
    this.queue.add(commitIndex); 
} 

Считыватель:

public void run() { 
    while (true) { 

     // take() puts this thread to sleep until a writer calls add() 
     int commitIndex = this.queue.take(); 

     // now the reader is awaken! 
     while (lastApplied < commitIndex) { 
      // do something with log.get(lastApplied) 
      lastApplied++; 
     } 
    } 
} 

Здесь я использовал атрибут queue, который должен соответствовать одному и тому же экземпляру LinkedBlockingQueue, для читателя и всех писателей.

Примечание: Исключение обработки в качестве упражнения.

+0

Ваше решение нецелесообразно для проблемы по нескольким причинам: 1. общий объект - это список объектов LogEntry (как я указал в начале вопроса), а не список int. Таким образом, очередь (в конечном счете) должна содержать объекты «LogEntry», а не «Integer». 2. Я заметил, что я не писал, что ни читатель не может изменить «безопасную» (зафиксированную) часть журнала, поэтому использование очереди (где операция 'take() удаляет голову очереди) не позволил. В зависимости от меня нужна структура, которая реализует операцию произвольного доступа (например, 'List' или' Array'). BTW thanks :) – justHelloWorld

+0

@justHelloWorld Я не предлагаю вам менять свой список 'LogEntry', продолжать использовать этот. Эта очередь 'Integer' предназначена только для сигнализации читателей. –

+1

Прошу прощения за мой комментарий выше, я не получил ваше решение. Я думаю, что ваше решение совершенно правильно, но я считаю, что @marchew еще один элегантный, поскольку он не вводит никакой новой структуры поддержки, если вы не убедите меня иначе. – justHelloWorld

0

попробовать это:

if(commitIndex is updated) 
{ 
    //wake up reader 
    synchronized(reader) 
    { 
    reader.notify(); 
    } 
} 
+0

Некоторые примеры wait() и notify(): http://www.programcreek.com/2009/02/notify-and- wait-example/ – marchew

+0

Хорошо, но тогда метод 'run()' читателя должен быть объявлен как синхронизированный, не так ли? – justHelloWorld

 Смежные вопросы

  • Нет связанных вопросов^_^