0

Я использую Спарк Streaming, чтобы принести твиты из твиттера, создавая StreamingContext как:
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))
Спарк Streaming: Регистрация Dstream партии в одну выходную папку

и создание щебет потока как:
val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)

затем сохраняя его как текстовый файл
tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")

и проблема в том, что твиты сохраняются в виде папок, основанных на пакетном времени, но я нужны все данные каждой партии в одной папке.

Есть ли обходной путь для этого?

Благодаря

ответ

0

Мы можем сделать это, используя искру в SQL новый экономия DataFrame API, которые позволяют добавить его к существующей продукции. По умолчанию saveAsTextFile не сможет сохранить каталог с существующими данными (см. https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations описывает, как настроить контекст Spark SQL для использования с Spark Streaming.

Предполагая, что вы скопировать часть из направляющей с SQLContextSingleton, Результирующий код будет выглядеть примерно так:

data.foreachRDD{rdd => 
    val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
    // Convert your data to a DataFrame, depends on the structure of your data 
    val df = .... 
    df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString)) 
} 

(Обратите внимание, в приведенном выше примере используется JSON для сохранения результата, но вы можете использовать различные форматы вывода слишком).

+0

Спасибо, Холден, сохранил мой день ... –

+0

Могу ли я сохранить DF как текстовый файл? Как я вижу, тип по умолчанию - это паркет. Каким должен быть источник? –

+0

@Holden, @HussainShaik У меня был тот же вопрос и я использовал ваше решение, но продолжаю получать ошибку - не найден: путь значения '[error] df.save (" com.databricks.spark.csv ", SaveMode.Append, Карта («путь» -> путь.toString)). Любой способ исправить это? – serendipity