Я пытаюсь реализовать подход самообучения для обучения классификатора. Я использую искру 1.6.0. Проблема в том, когда я сопоставляю RDD с другим, я ошибаюсь. Тот же код работает нормально для небольших наборов данных по набору данных кластера, он просто гаснет.Spark: RDD отсутствует записи на каждой итерации
println("INITIAL TRAINING SET SIZE : " + trainingSetInitial.count())
for(counter <- 1 to 10){
println("------------------- This is the_" + counter + " run -----------------")
println("TESTING SET SIZE : " + testing.count())
val lowProbabilitiesSet = testing.flatMap { item =>
if (model.predictProbabilities(item._2)(0) <= 0.75 && model.predictProbabilities(item._2)(1) <= 0.75) {
List(item._1)
} else {
None
}}.cache()
val highProbabilitiesSet = testing.flatMap { item =>
if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75) {
List(item._1 +","+ model.predict(item._2).toDouble)
} else {
None
}}.cache()
println("LOW PROBAB SET : " + lowProbabilitiesSet.count())
println("HIGH PROBAB SET : " + highProbabilitiesSet.count())
trainingSetInitial = trainingSetInitial.union(highProbabilitiesSet.map(x => LabeledPoint(List(x)(0).split(",")(8).toString.toDouble, htf.transform(List(x)(0).toString.split(",")(7).split(" ")))))
model = NaiveBayes.train(trainingSetInitial, lambda = 1.0)
println("NEW TRAINING SET : " + trainingSetInitial.count())
previousCount = lowProbabilitiesSet.count()
testing = lowProbabilitiesSet.map { line =>
val parts = line.split(',')
val text = parts(7).split(' ')
(line, htf.transform(text))
}
testing.checkpoint()
}
Это бревно из правильного вывода:
начальной подготовки SET SIZE: 238,182
------------------ - Это the_1 запустить -----------------
ТЕСТИРОВАНИЕ SET РАЗМЕР: 3.158.722
LOW ВЕРОЯТНОСТЕЙ SET: 22,996
ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 3.135.726
НОВЫЙ УЧЕБНЫЙ КОМПЛЕКТ: 3373908
------------------- Это the_2 бег - ---------------
ТЕСТИРОВАНИЕ SET РАЗМЕР: 22996
LOW ВЕРОЯТНОСТЕЙ SET: 566
ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 22430
НОВЫЙ УЧЕБНЫЙ SET: 3396338
А вот когда начинается проблема (большой входных наборов данных):
начальной подготовки SET SIZE: 31.990.660
----- -------------- Это the_1 запустить -----------------
ТЕСТИРОВАНИЕ SET РАЗМЕР: 423.173.780
LOW ВЕРОЯТНОСТЕЙ SET: 62.615.460
ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 360.558.320
НОВЫЙ УЧЕБНЫЙ КОМПЛЕКТ: 395265857
----------------- - Это the_2 запустить -----------------
ТЕСТИРОВАНИЕ SET РАЗМЕР: 52673986
LOW ВЕРОЯТНОСТЕЙ SET: 51460875
ВЫСОКИЙ ВЕРОЯТНОСТЕЙ SET: 1213111
НОВЫЙ УЧЕБНЫЙ КОМПЛЕКТ: 401950263
'LOW ВЕРОЯТНОСТЕЙ SET' на первой итерации должен быть 'ИСПЫТАНИЯ SET' для второй итерации. Где-то 10 миллионов записей исчезают.Также «NEW TRAINING SET» на 1-й итерации должен быть конкатенацией «НАЧАЛЬНАЯ ПОДГОТОВКА» и «ВЫСОКОЙ ПРОБНОЙ УСТАНОВКИ». Опять цифры не совпадают.
У меня не было никаких ошибок во время работы кода. Я попытался кэшировать каждый набор и отключить в конце каждой итерации (только HIGH и LOW), но те же результаты. Я также попытался проверить контрольные точки, не работал. Почему это происходит?
EDIT
Только для тестирования я не создавал новую модель внутри цикла, чтобы посмотреть, что происходит:
for(counter <- 1 to 5){
println("------------------- This is the_" + counter + " run !!! -----------------")
var updated_trainCnt = temp_train.count();
var updated_testCnt = test_set.count();
println("Updated Train SET SIZE: " + updated_trainCnt)
println("Updated Testing SET SIZE: " + updated_testCnt)
val highProbabilitiesSet = test_set.filter { item =>
val output = model.predictProbabilities(item._2)
output(0) > 0.75 || output(1) > 0.75
}.map(item => (item._1 + "," + model.predict(item._2), item._2)).cache()
test_set = test_set.filter { item =>
val output = model.predictProbabilities(item._2)
output(0) <= 0.75 && output(1) <= 0.75
}.map(item => (item._1, item._2)).cache()
var hiCnt = highProbabilitiesSet.count()
var lowCnt = test_set.count()
println("HIGH PROBAB SET : " + hiCnt)
println("LOW PROBAB SET : " + lowCnt)
var diff = updated_testCnt - hiCnt - lowCnt
if (diff!=0) println("ERROR: Test set not correctly split into high low" + diff)
temp_train= temp_train.union(highProbabilitiesSet.map(x => LabeledPoint(x._1.toString.split(",")(8).toDouble, x._2))).cache()
println("NEW TRAINING SET: " + temp_train.count())
// model = NaiveBayes.train(temp_train, lambda = 1.0, modelType = "multinomial")
println("HIGH PROBAB SET : " + highProbabilitiesSet.count())
println("LOW PROBAB SET : " + test_set.count())
println("NEW TRAINING SET: " + temp_train.count())
}
Производимых номера, от оригинальной модели был в порядке, даже объединение RDD были выполнены без проблем. Но остается большой вопрос: как классификация моделирует набор тренировок (lowProbabilititesSet), даже не изменяя его в конце каждого цикла (или других RDD)?
Консольные бревна и искровые журналы не показывают никакой ошибки или раздавливания палача. Как процесс обучения классификации искажает мои данные?
Пожалуйста, упростите код до минимально необходимого количества логики, это поможет как вам, так и нам найти проблему. –
Я думаю, что это проблема здесь, попробуйте unionAll. union удаляет дубликат между двумя наборами –
Я имею дело с RDD, а не с фреймами данных. –