-2

Я новичок в искры-Кассандре и Скале. У меня есть RDD. скажем:Scala Spark Filter RDD с использованием Cassandra

((url_hash, url, created_timestamp)).

Я хочу отфильтровать этот RDD на основе url_hash. Если url_hash существует в таблице Cassandra, я хочу отфильтровать его из RDD, чтобы я мог обрабатывать только новые URL-адреса.

Cassandra Таблица выглядит следующим образом:

url_hash| url | created_timestamp | updated_timestamp 

Все указатели будут велики.

Я пытался что-то вроде этого: это

case class UrlInfoT(url_sha256: String, full_url: String, created_ts: Date) 
    def timestamp = new java.utils.Date() 
    val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp))) 
    val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts") 
    val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts))) 
    newUrlsRDD = rdd1.subtractByKey(rdd3) 

я получаю ошибку Cassandra

java.lang.NullPointerException: Unexpected null value of column full_url in  keyspace.url_info.If you want to receive null values from Cassandra, please wrap the column type into Option or use JavaBeanColumnMapper 

Там нет нулевых значений в таблице Кассандры

+1

Что вы пробовали? Поверните таблицу Cassandra в другой RDD, «сопоставьте» оба, так что у них есть «url_hash» в качестве ключа, а затем используйте 'subtractByKey'? –

+0

Спасибо за указатель. Я обновил вопрос тем, что я пробовал. теперь я получаю исключение нулевого указателя – Abhishek

ответ

1

Спасибо Архетипический Paul!

Я надеюсь, что кто-то посчитает это полезным. Пришлось добавить вариант к классу case.

Ждем лучших решений

case class UrlInfoT(url_sha256: String, full_url: Option[String], created_ts: Option[Date]) 

def timestamp = new java.utils.Date() 
val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp))) 
val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts") 
val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts))) 
newUrlsRDD = rdd1.subtractByKey(rdd3)