2016-08-21 5 views
1

Использование kafka-streams 0.10.0.0, я периодически вижу исключение нулевого указателя в StreamTask при пересылке сообщения. Он варьируется от 10% до 50% вызовов. NPE происходит в этом методе:Периодический NPE в потоке процессора Kafka Streams

public <K, V> void forward(K key, V value) { 
    ProcessorNode thisNode = currNode; 
    try { 
     for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { 
      currNode = childNode; 
      childNode.process(key, value); 
     } 
    } finally { 
     currNode = thisNode; 
    } 
} 

кажется, что в некоторых случаях thisNode поле пустым. Любая идея, что может быть причиной этого? Трассировка стека ниже.

[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed 
java.lang.NullPointerException 
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?] 
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?] 
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?] 
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?] 
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?] 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?] 
+1

Вы можете поделиться своим топологическим кодом? Вы попробовали '0.10.0.1'? –

+0

Выяснил это. См. Ответ. Спасибо за попытку. Ошибка программиста была настолько вопиющей, как только я понял, я не тестировал 0.10.0.1. – Nicholas

+0

Кстати, эта проблема также встречается в '0.10.2.1'. Ваш ответ был спасением, спасибо! – Esk

ответ

3

Проблема заключалась в том, что мои ProcessorSupplier s возвращали тот же экземпляр процессора для каждого вызова get. В свою очередь, движок Kafka Streams пытался создать несколько экземпляров процессора, и я, без сомнения, создал многопоточный огонь для мусорных контейнеров. Обратите внимание на аналогичную неосторожность .... ProcessorSupplier.get() должен возвращать новый экземпляр процессора для каждого вызова.

+0

Вы можете принять свой собственный ответ :) –

+0

Хороший звонок. Благодаря ! – Nicholas