2014-10-28 2 views
0

Я бы хотел сбросить первые n RDD из DStream. Я попытался использовать следующую функцию вместе с преобразованием, но она не работает (ERROR OneForOneStrategy: org.apache.spark.SparkContext java.io.NotSerializableException), и я не думаю, что это выполнило бы мою настоящую цель - удалить RDD, потому что он будет возвращать пустые.Как удалить RDD из DStream в Spark Streaming?

var num = 0 
def dropNrdds(myRDD: RDD[(String, Int)], dropNum: Int) : RDD[(String, Int)] = { 
    if (num < dropNum) { 
     num = num + 1 
     return myRDD 
    } 
    else { 
     return sc.makeRDD(Seq()) 
    } 
} 

ответ

1

Ошибка в том, что ваша функция относится к вашей var num и содержащий класс не Serializable. Ваша функция будет вызываться разными узлами кластера, поэтому все, от чего она зависит, должна быть Serializable, и вы не можете делиться переменной между различными вызовами вашей функции (потому что они могут выполняться на разных узлах кластера).

Это кажется очень странным, хочет сбросить определенное количество RDD с от DStream, учитывая, что путь конкретного DStream расщепляется довольно много деталь реализации. Возможно, метод slice может быть сделан для того, чтобы делать то, что вы хотите?

+0

Есть ли способ отрезать первые n окон, но все равно получить остальную часть потока? Я посмотрел на эту функцию, и я думаю, что мне нужно указать конец фрагмента. Моя мотивация: http://stackoverflow.com/questions/26445407/how-can-i-perform-an-operation-on-two-windowed-dstreams-with-an-offset – Zatricion

+0

Или есть способ, которым я могу поддерживать отслеживать фрагмент, который я сделал, и продолжать перемещать его, когда все добавляется к входному потоку? – Zatricion

0

Вы получаете сообщение об ошибке, потому что, я предполагаю, что Вы вызываете эту функцию из

foreachRdd

петлю, которая на самом деле получает выполняется на исполнителями узлов, и если вы хотите что-то, чтобы выполняться на узлы-исполнители, в которых код кода должен быть Serializable, и SparkContext (sc, вы ссылаетесь на него внутри вашего метода dropNrdds) не является Serializable, поэтому вы получаете эту ошибку.

и подходит к вашему актуальному вопросу.

не уверен, о вашем требовании, но

вы можете создать DataFrame для вашего РДА и выберите записи, которые соответствуют вашим критериям. и игнорировать остальные.

или

вы можете использовать фильтр и создать свежий RDD с данными фильтрами.