0

У меня есть искровое задание, которое работает, считывает данные из одной таблицы cassandra и возвращает результат обратно в две таблицы с небольшими изменениями. Моя проблема в том, что работа занимает гораздо больше времени, чем ожидалось.Запись Spark-Cassandra занимает больше времени, чем ожидалось

Код выглядит следующим образом:

val range = sc.parallelize(0 to 100) 

val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2) 

val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4)).groupByKey 

val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted)) 

//STORE 1 

rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble)).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5")) 

//STORE 2 

rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2)).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11")) 

Для около миллиона записей, ЗАП 1 занимает около 40 секунд и МАГАЗИНА 2 (небольшое изменение в rdd3) занимает больше, чем за минуту. Я не уверен, где я ошибаюсь или почему занимает так много времени. Моя искровым среда выглядит следующим образом:

ДСЕ 4.8.9 с 6 узлами 70 ГБ ОЗУ 12 ядер каждый

Любая помощь оценена.

+0

вы пытались добавить rdd3 в контрольную точку и посмотреть, он идет быстрее? –

+0

Nope. Он не ускоряется –

ответ

0

Позвольте мне сделать мое предположение. Для более точного ответа необходимы журналы, перфорированный вывод и модель данных C *. Но некоторые математики: Вы

  • joinWithCassandra - случайный C * прочитать
  • saveToCassandra - сек C * запись
  • искрового переделу?/Уменьшить

(Я ожидаю, что saveToCassadndra занимает половину всего времени) , и если вы не выполняете никаких запросов, прежде чем вы должны минус 12-20 сек для искры, чтобы начать исполнителей и другие вещи

SO для 1M записей на 6nodes и 40 секунд, которые вы получили: 1000000/6/40 = 4166 записей/сек/узел. Это неплохо. 10K/s на узел со смешанной рабочей нагрузкой - хороший результат.

Вторая запись в 2 раза больше (11 столбцов по сравнению с 5), и она запускается после первого, поэтому я ожидаю, что Cassandra начнет проливать предыдущие данные на диск в этот момент, так что вы можете получить более высокую деградацию здесь.

Правильно ли я понимаю, что при добавлении вызова rdd3.cache() ничего не изменилось для второго запуска? Это странно.

и да, вы можете получить лучшие результаты с настройкой C модели данных * и Спарк/C * Параметры

 Смежные вопросы

  • Нет связанных вопросов^_^