У меня есть искровое задание, которое работает, считывает данные из одной таблицы cassandra и возвращает результат обратно в две таблицы с небольшими изменениями. Моя проблема в том, что работа занимает гораздо больше времени, чем ожидалось.Запись Spark-Cassandra занимает больше времени, чем ожидалось
Код выглядит следующим образом:
val range = sc.parallelize(0 to 100)
val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2)
val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4)).groupByKey
val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted))
//STORE 1
rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble)).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5"))
//STORE 2
rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2)).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11"))
Для около миллиона записей, ЗАП 1 занимает около 40 секунд и МАГАЗИНА 2 (небольшое изменение в rdd3) занимает больше, чем за минуту. Я не уверен, где я ошибаюсь или почему занимает так много времени. Моя искровым среда выглядит следующим образом:
ДСЕ 4.8.9 с 6 узлами 70 ГБ ОЗУ 12 ядер каждый
Любая помощь оценена.
вы пытались добавить rdd3 в контрольную точку и посмотреть, он идет быстрее? –
Nope. Он не ускоряется –