Вот пример senario, у нас есть запись данных в реальном времени в cassandra, и мы хотим агрегировать данные в разных временных диапазонах. То, что я пишу код, как показано ниже:Как парализовать работу RDD при использовании искрового соединителя cassandra для агрегирования данных?
val timeRanges = getTimeRanges(report)
timeRanges.foreach { timeRange =>
val (timestampStart, timestampEnd) = timeRange
val query = _sc.get.cassandraTable(report.keyspace, utilities.Helper.makeStringValid(report.scope)).
where(s"TIMESTAMP > ?", timestampStart).
where(s"VALID_TIMESTAMP <= ?", timestampEnd)
......do the aggregation work....
, что проблема кода является то, что для каждого интервала времени, работа агрегации работает не в parallized. Мой вопрос в том, как я могу скомпилировать работу агрегации? Поскольку RDD не может работать в другом RDD или будущем? Есть ли способ сгладить работу, или мы не можем использовать искровой соединитель здесь?
joinWithCassandraTable, который присоединится к двум таблицам, но в моем коде есть только одна таблица источников, значение диапазона генерируется. – wherby
Он соединяет 1 RDD с 1 столом. Нет необходимости, чтобы первый RDD был таблицей. – RussS
Какова фактическая схема таблицы, к которой вы пытаетесь получить доступ? – RussS