2017-01-24 7 views
2

Я пытаюсь использовать сообщения от Kafka, используя реактивную библиотеку kkka. Я получаю одно сообщение распечатана и после этого я получилReactive-Kafka Stream Потребитель: Произошли мертвые буквы

[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5] [akka://CommittableSourceConsumerMain/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://CommittableSourceConsumerMain/deadLetters] to Actor[akka://CommittableSourceConsumerMain/system/kafka-consumer-1#-1726905274] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

Это код, который я уверен, выполнение

import akka.actor.ActorSystem 
import akka.kafka.scaladsl.Consumer 
import akka.kafka.{ConsumerSettings, Subscriptions} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import org.apache.kafka.clients.consumer.ConsumerConfig 
import play.api.libs.json._ 
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer} 

object CommittableSourceConsumerMain extends App { 

    implicit val system = ActorSystem("CommittableSourceConsumerMain") 
    implicit val materializer = ActorMaterializer() 
    val consumerSettings =ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer).withBootstrapServers("localhost:9092").withGroupId("CommittableSourceConsumer").withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 

    val done = 
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
     .mapAsync(1) { msg => 
     val record=(msg.record.value()) 
     val data=Json.parse(record) 

     val recordType=data \ "data" \"event" \"type" 

     val actualData=data \ "data" \ "row" 

     if(recordType.as[String]=="created"){ 
      "Some saving logic" 
     } 

     else{ 

     "Some logic" 

     } 
     msg.committableOffset.commitScaladsl() 
     } 
     .runWith(Sink.ignore) 
} 

ответ

1

я, наконец, понял, решение. Из-за исключения времени выполнения в потоке возвращается Future отказа, который немедленно прекращает поток. Akka-stream не предоставляет или не отображает исключение во время выполнения. Чтобы знать об исключении

done.onFailure{ 
     case NonFatal(e)=>println(e) 
     } 

Исключение было в блоке if-else. Также можно использовать стратегию актера для возобновления потока, если происходит исключение.

+0

Я также сталкиваюсь с той же проблемой. не могли бы вы дать мне знать, как использовать Стратегию Актера, чтобы возобновить поток. Благодарю. – Rajesh