Я пытаюсь сопоставить следующую функцию на RDD, передавая из каждого элемента start, length и id. k и kmers одинаковое значение для всех элементов в RDD. Проблема заключается в том, что я звоню RDD.apply в этой функции, что я отображение, поэтому у меня есть эта ошибка:Как создавать карты и применять их в RDD и избегать трансформации внутри преобразования
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;
Ниже приведен код:
def getGapSequence(start: Int, length: Int, id: String, k: Int, kmers: RDD[((String, Int), String)]): String ={
var tempStart = start
var totalGap = ""
do{
val tempKmer = kmers.apply((id, start))
if(tempKmer != ""){
totalGap += tempKmer
tempStart += k
}else{
totalGap += 'N'
tempStart += 1
}
}while(totalGap.length < length)
totalGap.take(length)
}
Я, безусловно, необходимо получить доступ к пункты клмеров по их ключу, потому что иначе я не буду знать, какую строку добавить. Я также попытался преобразования kmers к другому типу данных (т.е. Map, Array), но я получаю
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 102:102 was 250174590 bytes, which exceeds max allowed: spark.akka.frameSize (16777216 bytes)
, так как размер данных, я работаю с настолько огромен. Я не могу разделить данные, и мне нужна приведенная выше информация. Как я могу достичь этого, желательно без увеличения искрового кадра? Спасибо.
Я успешно запускаю свою программу с небольшими наборами данных, где некоторые из моих других функций проходят вокруг RDD без передачи в SparkContext (который объявлен в моем основном методе). Не должно ли это работать и с этой функцией? Также я не могу сделать предложение 2, потому что это вызывает проблемы с несколькими «SparkContext». Не могли бы вы объяснить свое первое предложение? Все мои методы содержатся в одном объекте. Спасибо! – Alex