2017-02-16 5 views
2

У меня есть RDD, который был бы как ((String, String), TimeStamp). У меня большое количество записей, и я хочу выбрать для каждой клавиши запись с самым последним значением TimeStamp. Я пробовал следующий код и все еще пытаюсь это сделать. Может ли кто-нибудь помочь мне сделать это?Как подобрать самую раннюю дату отметки времени из RDD в scala

Код ниже я попытался не так и не работает, а

val context = sparkSession.read.format("jdbc") 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("url", url) 
    .option("dbtable", "student_risk") 
    .option("user", "user") 
    .option("password", "password") 
    .load() 
context.cache(); 

val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time")))) 
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2))) 
+0

z => (z._1, z._2)) неверно. Этот фрагмент вместо этого вытаскивает курсор_ид и student_id. Вы должны растянуть третью переменную. Я плохо разбираюсь в scala, поэтому не могу предоставить точную часть кода. – srikanth

+0

BTW - название гласит, что вы ищете «самое раннее», в то время как текст говорит «последнее» - я ответил на основании заголовка, очевидно, это легко изменить. –

+0

Небольшая проблема при передаче моего требования с использованием языка. Что мне нужно, чтобы отобразить запись с самым ранним значением отметки времени с каждым ключом. – Kepler

ответ

6

Это легко сделать непосредственно на DataFrame (странным именем context здесь):

val result = context 
    .groupBy("course_id", "student_id") 
    .agg(min("risk_date_time") as "risk_date_time") 

Затем вы можете преобразовать его в RDD (если необходимо), как и раньше, - результат имеет ту же схему.

Если вы хотите, чтобы выполнить это по РДУ, используйте reduceByKey:

studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2) 
+0

он дает ошибку при копировании при попытке с фреймом данных, как вы упомянули в качестве первого варианта? – Kepler

+0

Что не удается скомпилировать? Вам может потребоваться добавить 'import org.apache.spark.sql.functions._', чтобы получить функцию' min' в области –

+0

Мне нужно сделать одно замечание: t1 и t2 относятся к двум записям RDD и t1 представляет составные student_id и course_id? – Kepler

2

Первый код предоставить неверные результаты, так как уменьшить неверен. Функция уменьшения возвращает int (from compareTo) вместо пары x, y, но int не имеет члена ._2. Чтобы исправить эту попытку:

studentRDD.collect().map(z => (z._1, z._2)).reduce((x ,y) => if (x._2.compareTo(y._2) < 0) x else y)._1 

В основном эта новая функция будет возвращать запись с меньшим временем, а затем на общий результат (наименьший) вы берете ключ.

Обратите внимание, что вы делаете все это на драйвере из-за сбора. Нет причин собирать, сопоставлять и сокращать работу над RDD, чтобы вы могли получить тот же результат (и все еще быть масштабируемым), выполнив следующие действия: studentRDD.map (z => (z._1, z._2)). ((х, у) => если (x._2.compareTo (y._2) < 0) х еще у) ._ 1

Вы можете сделать это прямо из вашего контекста dataframe хотя:

val targetRow = context.agg(min(struct('risk_date_time, 'course_id, 'student_id)) as "rec").select($"rec.*").collect()(0) 
val key = (targetRow.getString(1), targetRow.getString(2))