Я новичок в искры-Кассандре и Скале. У меня есть RDD. скажем:Scala Spark Filter RDD с использованием Cassandra
((url_hash, url, created_timestamp)).
Я хочу отфильтровать этот RDD на основе url_hash. Если url_hash существует в таблице Cassandra, я хочу отфильтровать его из RDD, чтобы я мог обрабатывать только новые URL-адреса.
Cassandra Таблица выглядит следующим образом:
url_hash| url | created_timestamp | updated_timestamp
Все указатели будут велики.
Я пытался что-то вроде этого: это
case class UrlInfoT(url_sha256: String, full_url: String, created_ts: Date)
def timestamp = new java.utils.Date()
val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp)))
val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts")
val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts)))
newUrlsRDD = rdd1.subtractByKey(rdd3)
я получаю ошибку Cassandra
java.lang.NullPointerException: Unexpected null value of column full_url in keyspace.url_info.If you want to receive null values from Cassandra, please wrap the column type into Option or use JavaBeanColumnMapper
Там нет нулевых значений в таблице Кассандры
Что вы пробовали? Поверните таблицу Cassandra в другой RDD, «сопоставьте» оба, так что у них есть «url_hash» в качестве ключа, а затем используйте 'subtractByKey'? –
Спасибо за указатель. Я обновил вопрос тем, что я пробовал. теперь я получаю исключение нулевого указателя – Abhishek