2

Я пытаюсь реализовать подход самообучения для обучения классификатора. Я использую искру 1.6.0. Проблема в том, когда я сопоставляю RDD с другим, я ошибаюсь. Тот же код работает нормально для небольших наборов данных по набору данных кластера, он просто гаснет.Spark: RDD отсутствует записи на каждой итерации

println("INITIAL TRAINING SET SIZE : " + trainingSetInitial.count()) 
for(counter <- 1 to 10){ 
    println("------------------- This is the_" + counter + " run -----------------") 
    println("TESTING SET SIZE : " + testing.count()) 

    val lowProbabilitiesSet = testing.flatMap { item => 
    if (model.predictProbabilities(item._2)(0) <= 0.75 && model.predictProbabilities(item._2)(1) <= 0.75) { 
     List(item._1) 
    } else { 
     None 
    }}.cache() 
    val highProbabilitiesSet = testing.flatMap { item => 
    if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75) { 
     List(item._1 +","+ model.predict(item._2).toDouble) 
    } else { 
     None 
    }}.cache() 
    println("LOW PROBAB SET : " + lowProbabilitiesSet.count()) 
    println("HIGH PROBAB SET : " + highProbabilitiesSet.count()) 

    trainingSetInitial = trainingSetInitial.union(highProbabilitiesSet.map(x => LabeledPoint(List(x)(0).split(",")(8).toString.toDouble, htf.transform(List(x)(0).toString.split(",")(7).split(" "))))) 
    model = NaiveBayes.train(trainingSetInitial, lambda = 1.0) 
    println("NEW TRAINING SET : " + trainingSetInitial.count()) 

    previousCount = lowProbabilitiesSet.count() 
    testing = lowProbabilitiesSet.map { line => 
    val parts = line.split(',') 
    val text = parts(7).split(' ') 
    (line, htf.transform(text)) 
    } 
    testing.checkpoint() 
} 

Это бревно из правильного вывода:

начальной подготовки SET SIZE: 238,182

------------------ - Это the_1 запустить -----------------

ТЕСТИРОВАНИЕ SET РАЗМЕР: 3.158.722

LOW ВЕРОЯТНОСТЕЙ SET: 22,996

ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 3.135.726

НОВЫЙ УЧЕБНЫЙ КОМПЛЕКТ: 3373908

------------------- Это the_2 бег - ---------------

ТЕСТИРОВАНИЕ SET РАЗМЕР: 22996

LOW ВЕРОЯТНОСТЕЙ SET: 566

ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 22430

НОВЫЙ УЧЕБНЫЙ SET: 3396338

А вот когда начинается проблема (большой входных наборов данных):

начальной подготовки SET SIZE: 31.990.660

----- -------------- Это the_1 запустить -----------------

ТЕСТИРОВАНИЕ SET РАЗМЕР: 423.173.780

LOW ВЕРОЯТНОСТЕЙ SET: 62.615.460

ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 360.558.320

НОВЫЙ УЧЕБНЫЙ КОМПЛЕКТ: 395265857

----------------- - Это the_2 запустить -----------------

ТЕСТИРОВАНИЕ SET РАЗМЕР: 52673986

LOW ВЕРОЯТНОСТЕЙ SET: 51460875

ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 1213111

НОВЫЙ УЧЕБНЫЙ КОМПЛЕКТ: 401950263

'LOW ВЕРОЯТНОСТЕЙ SET' на первой итерации должен быть 'ИСПЫТАНИЯ SET' для второй итерации. Где-то 10 миллионов записей исчезают.Также «NEW TRAINING SET» на 1-й итерации должен быть конкатенацией «НАЧАЛЬНАЯ ПОДГОТОВКА» и «ВЫСОКОЙ ПРОБНОЙ УСТАНОВКИ». Опять цифры не совпадают.

У меня не было никаких ошибок во время работы кода. Я попытался кэшировать каждый набор и отключить в конце каждой итерации (только HIGH и LOW), но те же результаты. Я также попытался проверить контрольные точки, не работал. Почему это происходит?

EDIT

Только для тестирования я не создавал новую модель внутри цикла, чтобы посмотреть, что происходит:

for(counter <- 1 to 5){ 
    println("------------------- This is the_" + counter + " run !!! -----------------") 
    var updated_trainCnt = temp_train.count(); 
    var updated_testCnt = test_set.count(); 
    println("Updated Train SET SIZE: " + updated_trainCnt) 
    println("Updated Testing SET SIZE: " + updated_testCnt) 

    val highProbabilitiesSet = test_set.filter { item => 
    val output = model.predictProbabilities(item._2) 
    output(0) > 0.75 || output(1) > 0.75 
    }.map(item => (item._1 + "," + model.predict(item._2), item._2)).cache() 

    test_set = test_set.filter { item => 
    val output = model.predictProbabilities(item._2) 
    output(0) <= 0.75 && output(1) <= 0.75 
    }.map(item => (item._1, item._2)).cache() 
    var hiCnt = highProbabilitiesSet.count() 
    var lowCnt = test_set.count() 
    println("HIGH PROBAB SET : " + hiCnt) 
    println("LOW PROBAB SET : " + lowCnt) 
    var diff = updated_testCnt - hiCnt - lowCnt 
    if (diff!=0) println("ERROR: Test set not correctly split into high low" + diff) 
    temp_train= temp_train.union(highProbabilitiesSet.map(x => LabeledPoint(x._1.toString.split(",")(8).toDouble, x._2))).cache() 
    println("NEW TRAINING SET: " + temp_train.count()) 
//  model = NaiveBayes.train(temp_train, lambda = 1.0, modelType = "multinomial") 
    println("HIGH PROBAB SET : " + highProbabilitiesSet.count()) 
    println("LOW PROBAB SET : " + test_set.count()) 
    println("NEW TRAINING SET: " + temp_train.count()) 
} 

Производимых номера, от оригинальной модели был в порядке, даже объединение RDD были выполнены без проблем. Но остается большой вопрос: как классификация моделирует набор тренировок (lowProbabilititesSet), даже не изменяя его в конце каждого цикла (или других RDD)?

Консольные бревна и искровые журналы не показывают никакой ошибки или раздавливания палача. Как процесс обучения классификации искажает мои данные?

+0

Пожалуйста, упростите код до минимально необходимого количества логики, это поможет как вам, так и нам найти проблему. –

+0

Я думаю, что это проблема здесь, попробуйте unionAll. union удаляет дубликат между двумя наборами –

+0

Я имею дело с RDD, а не с фреймами данных. –

ответ

-2

Я не вижу проблемы сразу. Пожалуйста, уменьшите код до фактической проблемы. Первое, что я хотел бы предложить, чтобы переписать flatMap операции на filter:

val highProbabilitiesSet = testing.flatMap { item => 
    if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75) { 
     List(item._1 +","+ model.predict(item._2).toDouble) 
    } else { 
    None 
    } 
}.cache() 

To:

val highProbabilitiesSet = testing.filter { item => 
    val output = model.predictProbabilities(item._2) 
    output(0) > 0.75 || output(1) > 0.75 
}.map(item => (item._1, model.predict(item._2).toDouble)).cache() 
+0

Я проверю это, но я думаю, что это просто другой способ наложения карты с использованием карты. Я не думаю, что это произвело бы что-то другое. Я скоро сообщу вам –

+0

Поскольку я не думал, что ничего не изменилось, оно получилось нормально на небольшом наборе данных, но это дало эти странные результаты в большом наборе данных –

0

Хотя я до сих пор не понял, почему это происходит, как хака я спустил РДУ к HDFS и сделал скрипт bash, который запускает класс итеративно и каждый раз считывает данные из HDFS. Когда я понял, проблема возникает, когда я тренирую классификатор внутри цикла.

 Смежные вопросы

  • Нет связанных вопросов^_^