2017-01-06 10 views
1

Как проверить пустой РДУ в PySpark

tweetStream.foreachRDD((rdd, time) => { 
    val count = rdd.count() 
    if (count > 0) { 
    var fileName = outputDirectory + "/tweets_" + time.milliseconds.toString  
    val outputRDD = rdd.repartition(partitionsEachInterval) 
    outputRDD.saveAsTextFile(fileName) 
} 

Я пытаюсь проверить значение счетчика или пустой RDD в потоковых данных в питона образом, выносливом поиске путей, а также пытался примеры ссылки ниже. http://spark.apache.org/docs/latest/streaming-programming-guide.html

ответ

3

RDD.isEmpty:

Возвращает истину, если и только если РДД не содержит на всех элементов.

sc.range(0, 0).isEmpty() 
True 
sc.range(0, 1).isEmpty() 
False 
0

Попробуйте использовать следующий фрагмент кода.

def process_rdd(rdd): 
    print rdd.count() 
    print("$$$$$$$$$$$$$$$$$$$$$$") 
    streamrdd_to_df(rdd) 

def empty_rdd(): 
    print "###The current RDD is empty. Wait for the next complete RDD ###" 

clean.foreachRDD(lambda rdd: empty_rdd() if rdd.count() == 0 else process_rdd(rdd)) 
+0

Пожалуйста, отформатируйте свой ответ, чтобы улучшить качество .. особенно добавьте блоки кода. – nakashu

 Смежные вопросы

  • Нет связанных вопросов^_^