Я читаю файлы последовательности Hadoop, используя Spark (v1.6.1). После кэширования RDD содержимое в RDD становится недействительным (последняя запись дублируется n
раз).Cached Spark RDD (чтение из файла последовательности) имеет недопустимые записи, как я могу это исправить?
Вот мой фрагмент кода:
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]) {
val seqfile = "data-1.seq"
val conf: SparkConf = new SparkConf()
.setAppName("..Buffer..")
.setMaster("local")
.registerKryoClasses(Array(classOf[Text]))
val sc = new SparkContext(conf)
sc.parallelize((0 to 1000).toSeq) //creating a sample sequence file
.map(i => (new Text(s"$i"), new Text(s"${i*i}")))
.saveAsHadoopFile(seqfile, classOf[Text], classOf[Text],
classOf[SequenceFileOutputFormat[Text, Text]])
val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
.cache()
.map(t => {println(t); t})
.collectAsMap()
println(c)
println(c.size)
sc.stop()
}
}
Выход:
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
...... //Total 1000 lines with same content as above ...
Map(1000 -> 1000000)
1
EDIT: Для будущих посетителей: Если вы читаете файл последовательности, как я сделал в приведенном выше коде фрагмент, см. принятый ответ. Простое решение, чтобы сделать копию Hadoop Writable
Например:
val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
.map(t =>(new Text(t._1), new Text(t._2))) //Make copy of writable instances
Возможно, это связано с [SPARK-993] (https://issues.apache.org/jira/browse/SPARK-993). – climbage
Да, это связано с [SPARK-993] (https://issues.apache.org/jira/browse/SPARK-993). Благодарю. –