У меня есть искровой потоковое приложение, которое выглядит следующим образом:Spark Streaming: Как добавить дополнительные разделы в мой DStream?
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD(rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
И, я запускаю его на кластере пряжи с использованием
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
Когда я пытаюсь войти kafkaDF.rdd.partitions.size
, результат получается быть '1' или '5' в основном. Я смущен, возможно ли контролировать количество разделов моего DataFrame? KafkaUtils.createStream
, похоже, не принимает никаких параметров, связанных с количеством разделов, которые я хочу для rdd. Я пробовал kafkaDF.rdd.repartition(int)
, но он тоже не работает.
Как добиться большего параллелизма в моем коде? Если мой подход ошибочен, каков правильный способ его достижения?
Вы попробовали решение? Это сработало для вас? – marios
Я добавил больше потребителей и больше разделов на тему Кафки. Теперь производительность лучше. Дайте мне знать – void