2017-02-08 7 views
0

В приложении Spark Streaming есть внешний источник данных (реляционная база данных), который мне нужно запрашивать каждые 10 минут и сделать результаты доступными для моего конвейера обработки потока.Spark Streaming: сделать обновляемый набор результатов доступным для логики обработки потока

Я не совсем понимаю, как правильно это сделать. Accumulators будут добавлять только (как описано в документации), но я нашел это:

/** 
    * Set the accumulator's value; only allowed on master 
    */ 
    def setValue(newValue: R) { 
    this.value = newValue 
    } 

и Broadcast variables только однократной записи https://spark.apache.org/docs/1.6.2/programming-guide.html#broadcast-variables

аспект планирования также не ясно для меня.

Есть ли способ сделать обновленный набор результатов доступным для логики обработки потока?

PS Это, кажется, очень похоже на то, что мне нужно How can I update a broadcast variable in spark streaming?

+0

«Мне нужно запросить через каждые 10 минут и сделать результаты для моего конвейера обработки потока» Что вы имеете в виду здесь? Есть таблица, к которой прилагается, и вам нужно прочитать последние строки? Запрос, который нужно выполнить? Какие другие источники данных находятся в вашем потоковом конвейере? Больше информации о том, что вы пытаетесь сделать здесь, поможет. –

+0

Фактически существует служба с API поверх базы данных, поэтому я просто запрашиваю службу и возвращает обновленную версию данных. Я использую прямой подход Kafka и должен «обогащать» поток самой современной версией, но каждый раз запрашивать API (даже если внутри раздела 'foreachPartition') очень дорого. –

+1

Возможный дубликат [Периодическое обновление глобальной переменной в Spark] (http://stackoverflow.com/questions/33748528/updating-a-global-variable-periodically-in-spark) – Aastha

ответ

0

Я делаю сохранить в Java и работает достаточно. Существует аналогичный ответ здесь Updating a global variable periodically in Spark, а также упоминается в вопросе

public Broadcast<Map<String, List<String>>> updateBroadcastVariable(
    SparkContext sparkContext, DatabaseService databaseService) { 
Date d = Calendar.getInstance().getTime(); 
long diff = d.getTime()-mysqlLastUpdatedAt.getTime(); 
if (updatedVar == null || diff > 60000) { 
    if (var != null) 
    updatedVar.unpersist(); 
    mysqlLastUpdatedAt = new Date(System.currentTimeMillis()); 
    Map<String, List<String>> map = new LinkedHashMap<>(); 

    List<String> dbData = databaseService.refreshData(JavaSparkContext.fromSparkContext(sparkContext)); 
    } 
    updatedVar = JavaSparkContext.fromSparkContext(sparkContext).broadcast(dbData); 
} 
return updatedVar; 
}