2015-11-11 5 views
1

Похоже, что RDD.take() просто повторяет последний элемент, прочитанный при поддержке SequenceFile.
Например:Spark RDD take() с файлом последовательности

val rdd = sc.sequenceFile("records.seq", classOf[LongWritable], classOf[RecordWritable]) 
val records: Array[(LongWritable, RecordWritable)] = rdd.take(5) 
System.out.println(records.map(_._2.toString).mkString("\n")) 

Выходы:

Record(3.1, 2.5) 
Record(3.1, 2.5) 
Record(3.1, 2.5) 
Record(3.1, 2.5) 
Record(3.1, 2.5) 

Даже если я знаю, что строки являются уникальными.

Эта проблема также существует для sc.binaryRecords().

Я понимаю, что это, вероятно, связано с проблемой кэширования Hadoop Writable, но есть ли планы исправить это? Есть ли какие-нибудь работы?

ответ

1

Я попытался повторить свой вопрос, и да, я тоже видел подобное поведение при вызове принимать непосредственно на результат sc.sequenceFile(). Но удалось найти работу:

Примечание: Я объясняю использование LongWritable и Text вместо WriteWritable. Я не уверен, импорта, необходимого для RecordWritable
Мой файл последовательности, имеющий: (0,0) (1,1) (2,2) ...

val rdd = sc.sequenceFile("sequencefile.seq", classOf[LongWritable], classOf[Text]) 
val map = rdd.map(case (k,v) => (k.get(),v.toString())) 
map.take(1); 
res5: Array[(Long, String)] = Array((0,0)) 
map.take(5); 
res4: Array[(Long, String)] = Array((0,0), (1,1), (2,2), (3,3), (4,4)) 
+1

Спасибо за ответ. Вот пункт джиры в Спарке. https://issues.apache.org/jira/browse/SPARK-1018 Похоже, это также влияет на collect(): – shj