2015-03-11 2 views
0

Мы столкнулись с высокой ситуацией использования процессора, когда один из наших EventHandlers сломался.Disruptor park/halt несколько EventHandlers при возникновении исключения

Предположим, у нас есть несколько потребителей (EventHanlders), которые настроены на последовательное выполнение над буфером. Если первый EventHandler выдает исключение, есть ли способ остановить (и просыпать их позже) все остальные EventHandlers.

То, что мы делаем, это положить провальную нить в сон и после того, как мы попытаемся снова использовать одно и то же событие. Но мы заметили, что другие потоки продолжают работать и пытаются читать из RingBuffer даже там, где нет событий для чтения, что повышает процессор за приемлемыми уровнями.

На данный момент я отбрасываю, что это происходит потому, что WaitStrategydisruptor, потому что при нормальных условиях работает как ожидалось. Мы используем BlockingWaitStrategy.

еще несколько объяснений ради примера

INPUT -> [A*] -> [B] -> [C] -> [D] 

Где вход событие опрашиваться из RingBuffer и А, В, С и D являются различными EventHandlers, которые выполняются последовательно. A * - потребитель, бросающий исключение.

Что мы хотим достичь, так это то, что когда потребитель A не может потреблять событие (например, после исключения), метод OnEvent (...) этого потребителя не выходит, но останется в цикле с регулярными снами пытаясь повторить то же самое событие, когда оно просыпается. Между тем все остальные потребители должны быть припаркованы или спать до тех пор, пока А не добьется успеха.

Мы используем версию disruptor 3.3.0.

Я работаю в Google, но не нашел рабочего решения.

Заранее спасибо.

Salva.

ответ

0

Колледж установил, что эта проблема может быть связана с циклом while в методе waitFor в BlockingWaitStrategy.

long availableSequence; 
    while((availableSequence = dependentSequence.get()) < sequence) { 
     barrier.checkAlert(); 
    } 

После нескольких испытаний мы пришли через это возможное решение:

var availableSequence: Long = dependentSequence.get() 

while(availableSequence < sequence) { 
    this.lock.lock() 
    this.lock.unlock() 
    availableSequence = dependentSequence.get() 
} 

availableSequence 

В основном это делает, что один поток блокирует ресурс и с этим мы парковать сиюминутные все другие потребители, избегая высокой загрузки процессора ,

Второй момент - это условие while. Это происходит, когда доступная последовательность (то есть последовательность зависимых потоков) находится ниже текущего порядкового номера. И это происходит только тогда, когда один поток удерживает блокировку, например, когда A выдает исключение.

Мы по-прежнему расследуем, является ли это допустимым решением или могут иметь некоторые нежелательные побочные эффекты.

Любой, хотя об этом добро пожаловать.