2016-01-24 3 views
2

Я работал с примером рабочего потока и пытался использовать разные WaitStrategies. Когда я пытаюсь использовать TimeoutBlockingWaitStrategy, я получаю ошибку. Вот программа и стек вызовов.Исключение Null Pointer во время использования TimeoutBlockingWaitStrategy in disruptor

package org.lmax.experiment.test; 
import java.util.Random; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.ThreadFactory; 
import java.util.concurrent.TimeUnit; 

import org.junit.After; 
import org.junit.Before; 
import org.junit.Test; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import com.lmax.disruptor.BlockingWaitStrategy; 
import com.lmax.disruptor.BusySpinWaitStrategy; 
import com.lmax.disruptor.EventFactory; 
import com.lmax.disruptor.EventTranslatorOneArg; 
import com.lmax.disruptor.RingBuffer; 
import com.lmax.disruptor.SleepingWaitStrategy; 
import com.lmax.disruptor.TimeoutBlockingWaitStrategy; 
import com.lmax.disruptor.WorkHandler; 
import com.lmax.disruptor.YieldingWaitStrategy; 
import com.lmax.disruptor.dsl.Disruptor; 
import com.lmax.disruptor.dsl.ProducerType; 

@SuppressWarnings("unused") 
public class MultipleWorkerPoolsTest { 

private static final Logger log = LoggerFactory 
     .getLogger(MultipleWorkerPoolsTest.class); 
private static class StringEvent{ 
    private String value; 

    public String getValue() { 
     return value; 
    } 

    public void setValue(String value) { 
     this.value = value; 
    } 
    public String toString(){ 
     return value; 
    } 
} 

private static class Worker implements WorkHandler<StringEvent>{ 


    private static final Logger log = LoggerFactory 
      .getLogger(MultipleWorkerPoolsTest.Worker.class); 
    private String workerId; 


    public Worker(String workerId) { 
     super(); 
     this.workerId = workerId; 
    } 


    Random r=new Random(); 
    @Override 
    public void onEvent(StringEvent event) throws Exception { 
     System.out.println("{" + workerId+ "} got {" + event + "}"); 
     log.info("{} got {}",workerId,event); 
     Integer timeToSleep = r.nextInt(10000); 
     Thread.sleep(timeToSleep); 
     System.out.println("{" + workerId+ "} Completed {" + event + "}"); 

    } 

} 

private static class StringEventFactory implements EventFactory<StringEvent>{ 

    @Override 
    public StringEvent newInstance() { 
     return new StringEvent(); 
    } 

} 

private static class StringEventTranslator implements  EventTranslatorOneArg<StringEvent,String>{ 

    @Override 
    public void translateTo(StringEvent event, long sequence, String arg0) { 
     event.setValue("event "+sequence+": "+arg0); 
    } 

} 

private Disruptor<StringEvent> disruptor; 
private ExecutorService executor; 
@Before 
public void before(){ 
    executor=Executors.newFixedThreadPool(12, new ThreadFactory() { 

     @Override 
     public Thread newThread(Runnable r) { 
      Thread t=new Thread(r); 
      t.setDaemon(true); 
      String threadName = "worker_1_" + t.getId() ; 
      t.setName(threadName); 
      return t; 
     } 
    }); 
    disruptor=new Disruptor<StringEvent>(new StringEventFactory(),64,executor, ProducerType.MULTI,new TimeoutBlockingWaitStrategy(1000, TimeUnit.NANOSECONDS)); 

    Worker w11=new Worker("worker_1-1"); 
    Worker w12=new Worker("worker_1-2"); 
    Worker w13=new Worker("worker_1-3"); 
    Worker w14=new Worker("worker_1-4"); 
    Worker w21=new Worker("worker_2-1"); 
    Worker w22=new Worker("worker_2-2"); 
    Worker w23=new Worker("worker_2-3"); 
    Worker w24=new Worker("worker_2-4"); 
    Worker w31=new Worker("worker_3-1"); 
    Worker w32=new Worker("worker_3-2"); 
    Worker w33=new Worker("worker_3-3"); 
    Worker w34=new Worker("worker_3-4"); 

    Worker[] workerArray = new Worker[12]; 
/* workerArray[0] = w11; 
    workerArray[1] = w12; 
    workerArray[2] = w13; 
    workerArray[3] = w14;*/ 

    for (int i = 0; i < 12; i++){ 
     String name = "worker_" + Integer.toString(i); 
     workerArray[i] = new Worker(name); 
    } 


    disruptor.handleEventsWithWorkerPool(workerArray); 
// .thenHandleEventsWithWorkerPool(w21,w22,w23,w24) 
// .thenHandleEventsWithWorkerPool(w31,w32,w33,w34); 

    //disruptor.handleEventsWithWorkerPool(w11); 

} 

@After 
public void after() throws InterruptedException{ 
    executor.shutdown(); 
    executor.awaitTermination(0, TimeUnit.MILLISECONDS); 
} 
@Test 
public void test1() throws InterruptedException, ExecutionException{ 
    StringEventTranslator t=new StringEventTranslator(); 

    ExecutorService executorService = Executors.newFixedThreadPool(3); 


    disruptor.start(); 
    RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer(); 

    Future future = executorService.submit(new Runnable() { 
     public void run() { 
      StringEventTranslator t=new StringEventTranslator(); 
      RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer(); 


      String threadName = Thread.currentThread().getName(); 
      String threadId = Long.toString(Thread.currentThread().getId()); 
      String eventName = "hello" + threadName + "." + threadId; 
      for(int i=0;i<10000;i++){ 
       ringBuffer.publishEvent(t, eventName); 
       //disruptor.publishEvent(t,eventName); 
      } 
     } 
    }); 

    Future future1 = executorService.submit(new Runnable() { 
     public void run() { 
      StringEventTranslator t=new StringEventTranslator(); 
      RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer(); 
      String threadName = Thread.currentThread().getName(); 
      String threadId = Long.toString(Thread.currentThread().getId()); 
      String eventName = "hello" + threadName + "." + threadId; 
      for(int i=0;i<10000;i++){ 
       ringBuffer.publishEvent(t, eventName); 
       try { 
        Thread.sleep(10); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 
     } 
    }); 
    future1.get(); 
    future.get(); 
    /* for(int i=0;i<100;i++){ 
     disruptor.publishEvent(t,"hello"); 
    }*/ 


    System.out.println("Disruptor shutting down"); 
    log.info("Disruptor shutting down"); 
    Thread.sleep(1000000); 
    disruptor.shutdown(); 
    System.out.println("Disruptor shut down complete"); 

    log.info("Disruptor shutdown"); 
    } 


} 

Ошибка следующая

Exception in thread "worker_1_9" Exception in thread "worker_1_12" Exception in thread "worker_1_16" Exception in thread "worker_1_20" Exception in thread "worker_1_17" Exception in thread "worker_1_14" java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Exception in thread "worker_1_10" Exception in thread "worker_1_19" Exception in thread "worker_1_11" Exception in thread "worker_1_18" java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Exception in thread "worker_1_13" java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Exception in thread "worker_1_15" java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
java.lang.NullPointerException 
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Хотите знать, если есть ошибка в моем коде. Любая помощь действительно ценится. Также, если кто-то может объяснить цель TimeoutBlockingWaitStrategy в disruptor, это было бы здорово.

ответ

2

Это ошибка в 3.3.4 и более ранних версиях Разрушителя и будет зафиксирована в 3.3.5.

+0

Спасибо, Майк. Попробуем с исправлением. – Harry