2013-07-29 2 views
0

Я пытаюсь использовать прерыватель для обработки сообщений. Мне нужны две фазы обработки. т.е. две группы обработчиков, работающих в пуле рабочих, как это (я думаю):Проблемы с производительностью disruptor при использовании двух слоев нескольких обработчиков в пуле

disruptor. 
handleEventsWithWorkerPool(
    firstPhaseHandlers) 
.thenHandleEventsWithWorkerPool(
    secondPhaseHandlers); 

при использовании приведенного выше кода, если я поместить более одного работника в каждой группе, производительность ухудшается. что означает, что тонны CPU были потрачены впустую за то же самое количество работы.

Я попытался настроить размер буферного буфера (который я уже видел, влияет на производительность), но в этом случае это не помогло. так что я делаю что-то не так, или это настоящая проблема?


Я прилагаю полную демо-версию проблемы.

import java.util.ArrayList; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.atomic.AtomicLong; 
import com.lmax.disruptor.EventFactory; 
import com.lmax.disruptor.EventTranslatorOneArg; 
import com.lmax.disruptor.WorkHandler; 
import com.lmax.disruptor.dsl.Disruptor; 

final class ValueEvent { 
private long value; 

public long getValue() { 
    return value; 
} 

public void setValue(long value) { 
    this.value = value; 
} 

public final static EventFactory<ValueEvent> EVENT_FACTORY = new  EventFactory<ValueEvent>() { 
    public ValueEvent newInstance() { 
     return new ValueEvent(); 
    } 
}; 
} 

class MyWorkHandler implements WorkHandler<ValueEvent> { 

AtomicLong workDone; 
public MyWorkHandler (AtomicLong wd) 
{ 
    this.workDone=wd; 
} 
public void onEvent(final ValueEvent event) throws Exception { 

    workDone.incrementAndGet(); 
} 

} 

class My2ndPahseWorkHandler implements WorkHandler<ValueEvent> { 


AtomicLong workDone; 
public My2ndPahseWorkHandler (AtomicLong wd) 
{ 
    this.workDone=wd; 
} 

public void onEvent(final ValueEvent event) throws Exception { 

    workDone.incrementAndGet(); 
} 

} 

class MyEventTranslator implements EventTranslatorOneArg<ValueEvent, Long> { 

@Override 
public void translateTo(ValueEvent event, long sequence, Long value) { 

    event.setValue(value); 

} 

} 

public class TwoPhaseDisruptor { 

static AtomicLong workDone=new AtomicLong(0); 

@SuppressWarnings("unchecked") 
public static void main(String[] args) { 

    ExecutorService exec = Executors.newCachedThreadPool(); 

    int numOfHandlersInEachGroup=Integer.parseInt(args[0]); 
    long eventCount=Long.parseLong(args[1]); 
    int ringBufferSize=2 << (Integer.parseInt(args[2])); 


    Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(
      ValueEvent.EVENT_FACTORY, ringBufferSize, 
      exec); 

    ArrayList<MyWorkHandler> handlers = new ArrayList<MyWorkHandler>(); 
    for (int i = 0; i < numOfHandlersInEachGroup ; i++) { 

     handlers.add(new MyWorkHandler(workDone)); 
    } 

    ArrayList<My2ndPahseWorkHandler > phase2_handlers = new ArrayList<My2ndPahseWorkHandler >(); 
    for (int i = 0; i < numOfHandlersInEachGroup; i++) { 
     phase2_handlers.add(new My2ndPahseWorkHandler(workDone)); 
    } 

    disruptor 
      .handleEventsWithWorkerPool(
        handlers.toArray(new WorkHandler[handlers.size()])) 
      .thenHandleEventsWithWorkerPool(
        phase2_handlers.toArray(new WorkHandler[phase2_handlers.size()])); 

    long s = (System.currentTimeMillis()); 
    disruptor.start(); 

    MyEventTranslator myEventTranslator = new MyEventTranslator(); 
    for (long i = 0; i < eventCount; i++) { 
     disruptor.publishEvent(myEventTranslator, i); 
    } 

    disruptor.shutdown(); 
    exec.shutdown(); 
    System.out.println("time spent "+ (System.currentTimeMillis() - s) + "  ms"); 
    System.out.println("amount of work done "+ workDone.get()); 
} 
} 

попробуйте запустить приведенный выше пример с 1 нить в каждой группе

1 100000 7 

на моем компьютере он дал

time spent 371 ms 
amount of work done 200000 

Тогда попробуйте его с 4-мя нитями в каждой группе

4 100000 7 

который на моем компьютере Uter дал

time spent 9853 ms 
amount of work done 200000 

во время запуска ЦПУ находится в использовании

+0

также - сколько ядер процессора у вас на самом деле? – jasonk

+0

Я думаю, что его 4 ядра – user2391480

ответ

2

100% Вы, кажется ложным Разделяя AtomicLong между нитями/ядрами. Я попробую, когда у меня будет больше времени с демонстрацией, однако гораздо лучше будет иметь каждый WorkHandler с частной переменной, которой владеет каждый поток (либо собственный AtomicLong, либо предпочтительно простой).


Обновление:

Если вы измените нарушающую линию:

Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(
     ValueEvent.EVENT_FACTORY, ringBufferSize, 
     exec, 
     com.lmax.disruptor.dsl.ProducerType.SINGLE, 
     new com.lmax.disruptor.BusySpinWaitStrategy()); 

вы получите намного лучшие результаты:

[email protected]:~/code/stackoverflow$ java -cp disruptor-3.1.1.jar:. TwoPhaseDisruptor 4 100000 1024 
time spent 2728  ms 
amount of work done 200000 

Я рассмотрел код и попытались исправить ложное совместное использование, но обнаружил небольшое улучшение. Именно тогда я заметил на своем 8core, что процессоры были нигде около 100% (даже для теста с четырьмя рабочими). Из этого я решил, по крайней мере, что стратегия уступающего/вращающегося ожидания приведет к снижению латентности, если у вас есть процессор для записи.

Просто убедитесь, что у вас есть не менее 8 ядер (для обработки вам потребуется 8, плюс один для публикации сообщений).

+0

круто. не знаю, я могу выбрать стратегию. Я действительно попробовал YieldingWaitStrategy, который дал наилучшие результаты на моем оборудовании (линейное масштабирование). – user2391480