2016-07-05 3 views
0

Я собираю данные из Кассандры, используя разъем Spark и Spark-Cassandra. У меня есть веб-приложение для него с одним общим SparkContext и REST api. Обработка имеет следующий поток:Как хранить таблицу Кассандры в памяти Spark в течение длительного времени?

  1. пользователя Cassandra таблицу
  2. подготовить его для фильтрации (последовательность искрового преобразований)
  3. фильтра, полученного RDD в соответствии с параметрами API вызова

В алгоритме выше только третьего шага различается для каждого вызова (зависит от параметров запроса api). Запрос Api выполняется в параллельном режиме (поток за запрос). Поскольку данные в таблице не очень динамичны, и у меня достаточно памяти для моих искровых работников, чтобы хранить целую таблицу, я хочу сохранить мой RDD после второго шага и каждый запрос просто фильтровать уже сохраненный RDD. Также я хочу периодически обновлять этот RDD. Каков наилучший способ его достижения?

+0

Я не уверен, что правильно понимаю, что вы хотите. Нужно ли разделять RDD между различными контекстами? В противном случае простой ['persist'] (http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) должен сделать трюк. –

+0

@Hawknight нет, у меня есть только один контекст. Могу ли я сохранить сохраненную таблицу внутри объекта scala? – Cortwave

+0

Ну, если ваша таблица преобразуется как RDD, вы можете просто сохранить свой RDD и сохранить ссылку на постоянную RDD (технически только сохраняющуюся после вызова действия) для последующих вызовов. –

ответ

1

Вы можете просто позвонить persist на RDD после шага 2. RDD будет вычислен и кэширован при вызове первого действия. Когда вам нужно обновить данные, просто позвоните unpersist. Это заставит Spark отказаться от старого кеша, а затем сохранить новый кеш при выполнении действия. В принципе, вы сделаете что-то подобное.

var data = loadAndFilter() 
while (!stop) { 
    data.persist() 
    // Do step 3 

    // Drop the old cache 
    data.unpersist(false) 
    // Load the fresh data 
    data = loadAndFilter() 
}