Я пытаюсь использовать прерыватель для обработки сообщений. Мне нужны две фазы обработки. т.е. две группы обработчиков, работающих в пуле рабочих, как это (я думаю):Проблемы с производительностью disruptor при использовании двух слоев нескольких обработчиков в пуле
disruptor.
handleEventsWithWorkerPool(
firstPhaseHandlers)
.thenHandleEventsWithWorkerPool(
secondPhaseHandlers);
при использовании приведенного выше кода, если я поместить более одного работника в каждой группе, производительность ухудшается. что означает, что тонны CPU были потрачены впустую за то же самое количество работы.
Я попытался настроить размер буферного буфера (который я уже видел, влияет на производительность), но в этом случае это не помогло. так что я делаю что-то не так, или это настоящая проблема?
Я прилагаю полную демо-версию проблемы.
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
final class ValueEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
public ValueEvent newInstance() {
return new ValueEvent();
}
};
}
class MyWorkHandler implements WorkHandler<ValueEvent> {
AtomicLong workDone;
public MyWorkHandler (AtomicLong wd)
{
this.workDone=wd;
}
public void onEvent(final ValueEvent event) throws Exception {
workDone.incrementAndGet();
}
}
class My2ndPahseWorkHandler implements WorkHandler<ValueEvent> {
AtomicLong workDone;
public My2ndPahseWorkHandler (AtomicLong wd)
{
this.workDone=wd;
}
public void onEvent(final ValueEvent event) throws Exception {
workDone.incrementAndGet();
}
}
class MyEventTranslator implements EventTranslatorOneArg<ValueEvent, Long> {
@Override
public void translateTo(ValueEvent event, long sequence, Long value) {
event.setValue(value);
}
}
public class TwoPhaseDisruptor {
static AtomicLong workDone=new AtomicLong(0);
@SuppressWarnings("unchecked")
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
int numOfHandlersInEachGroup=Integer.parseInt(args[0]);
long eventCount=Long.parseLong(args[1]);
int ringBufferSize=2 << (Integer.parseInt(args[2]));
Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(
ValueEvent.EVENT_FACTORY, ringBufferSize,
exec);
ArrayList<MyWorkHandler> handlers = new ArrayList<MyWorkHandler>();
for (int i = 0; i < numOfHandlersInEachGroup ; i++) {
handlers.add(new MyWorkHandler(workDone));
}
ArrayList<My2ndPahseWorkHandler > phase2_handlers = new ArrayList<My2ndPahseWorkHandler >();
for (int i = 0; i < numOfHandlersInEachGroup; i++) {
phase2_handlers.add(new My2ndPahseWorkHandler(workDone));
}
disruptor
.handleEventsWithWorkerPool(
handlers.toArray(new WorkHandler[handlers.size()]))
.thenHandleEventsWithWorkerPool(
phase2_handlers.toArray(new WorkHandler[phase2_handlers.size()]));
long s = (System.currentTimeMillis());
disruptor.start();
MyEventTranslator myEventTranslator = new MyEventTranslator();
for (long i = 0; i < eventCount; i++) {
disruptor.publishEvent(myEventTranslator, i);
}
disruptor.shutdown();
exec.shutdown();
System.out.println("time spent "+ (System.currentTimeMillis() - s) + " ms");
System.out.println("amount of work done "+ workDone.get());
}
}
попробуйте запустить приведенный выше пример с 1 нить в каждой группе
1 100000 7
на моем компьютере он дал
time spent 371 ms
amount of work done 200000
Тогда попробуйте его с 4-мя нитями в каждой группе
4 100000 7
который на моем компьютере Uter дал
time spent 9853 ms
amount of work done 200000
во время запуска ЦПУ находится в использовании
также - сколько ядер процессора у вас на самом деле? – jasonk
Я думаю, что его 4 ядра – user2391480