2017-02-16 7 views
4

Спарк Streaming Проблема с Кафки DirectStream:искра потоковом утверждение не удалось: Не удалось получить записи для искрового исполнитель-а-группа А-тему 7 244723248 после опроса 4096

искра потоковом утверждение не удалось: Не удалось получить записи для искровых исполнитель-а-а-группы темы 7 244723248 после опроса для 4096

Пытались:

1) Регулировка возрастающих spark.streaming.kafka.consumer.poll.ms

-

- от 512 до 4096, менее не удалось, но даже 10s отказавшего все еще существует

2) Отрегулировать исполнителю память от 1G до 2G

- частично работать, намного меньше, не удалось

3) https://issues.apache.org/jira/browse/SPARK-19275

- до сих пор получил сбой при потоковой длительностей все меньше, чем 8s ("session.timeout.ms" -> "30000")

4) Постарайся Свечи 2.1

- проблема все еще существует


с Scala 2.11.8, Кафка версия: 0.10.0.0, версия Spark: 2.0.2

Свечи конфиги

.config("spark.cores.max", "4") 
.config("spark.default.parallelism", "2") 
.config("spark.streaming.backpressure.enabled", "true") 
.config("spark.streaming.receiver.maxRate", "1024") 
.config("spark.streaming.kafka.maxRatePerPartition", "256") 
.config("spark.streaming.kafka.consumer.poll.ms", "4096") 
.config("spark.streaming.concurrentJobs", "2") 

с помощью искры -streaming-kafka-0-10-assembly_2.11-2.1.0.jar

Количество ошибок:

at scala.Predef$.assert(Predef.scala:170) 
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) 
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) 
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194) 
... 
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) 
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108) 
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142) 
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108) 
... 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

Потеря 1% + блоков данных от Kafka с этой неисправностью: (помощь PLS!

ответ

2

Текущее решение:

  • Увеличение num.network.threads в Кафка/Config/server.properties, по умолчанию 3
  • Увеличение spark.streaming.kafka.consumer.poll.ms значение ~! большой один ... без конфигурации spark.streaming.kafka.consumer.poll.ms, он с помощью spark.network.timeout, который 120s - вызывает некоторые проблемы
  • Необязательный шаг: Уменьшение «Макс .poll.records ", значение по умолчанию - 500
  • Дополнительный шаг: использовать Future {} для параллельной работы по времени

 Смежные вопросы

  • Нет связанных вопросов^_^