2016-01-12 1 views
1

Вот пример senario, у нас есть запись данных в реальном времени в cassandra, и мы хотим агрегировать данные в разных временных диапазонах. То, что я пишу код, как показано ниже:Как парализовать работу RDD при использовании искрового соединителя cassandra для агрегирования данных?

val timeRanges = getTimeRanges(report) 
timeRanges.foreach { timeRange => 
      val (timestampStart, timestampEnd) = timeRange 

      val query = _sc.get.cassandraTable(report.keyspace, utilities.Helper.makeStringValid(report.scope)). 
      where(s"TIMESTAMP > ?", timestampStart). 
      where(s"VALID_TIMESTAMP <= ?", timestampEnd) 
     ......do the aggregation work.... 

, что проблема кода является то, что для каждого интервала времени, работа агрегации работает не в parallized. Мой вопрос в том, как я могу скомпилировать работу агрегации? Поскольку RDD не может работать в другом RDD или будущем? Есть ли способ сгладить работу, или мы не можем использовать искровой соединитель здесь?

ответ

1

Используйте функцию joinWithCassandraTable. Это позволяет использовать данные из одного RDD для доступа к C * и вытягивать записи так же, как в вашем примере.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

joinWithCassandraTable использует драйвер Java, чтобы выполнить одну запроса для каждого раздела, требуемого источника RDD, так что никаких не-необходимости данные не будут запрошены или последовательную форму. Это означает, что соединение между любыми RDD и таблицей Cassandra можно предварительно сформировать без полной таблицы . При предварительном формировании между двумя таблицами Cassandra, которые разделяют ту же клавишу раздела , это не потребует перемещения данных между машинами. Во всех случаях этот метод будет использовать разбиение и размещение исходного кода RDD для определения местоположения.

+0

joinWithCassandraTable, который присоединится к двум таблицам, но в моем коде есть только одна таблица источников, значение диапазона генерируется. – wherby

+0

Он соединяет 1 RDD с 1 столом. Нет необходимости, чтобы первый RDD был таблицей. – RussS

+0

Какова фактическая схема таблицы, к которой вы пытаетесь получить доступ? – RussS

0

Наконец, мы используем соединение, чтобы присоединиться к каждому RDD и сделать их парализованными.