2015-06-23 2 views
0

Я пытаюсь использовать пользовательские классы case для записи в Cassandra (2.1.6) с помощью Spark (1.4.0). До сих пор, я попытался это с помощью DataStax spark-cassandra-connector 1.4.0-M1 и следующие классы случай:Как преобразовать вложенный класс case в тип UDTValue

case class Event(event_id: String, event_name: String, event_url: String, time: Option[Long]) 
[...] 
case class RsvpResponse(event: Event, group: Group, guests: Long, member: Member, mtime: Long, response: String, rsvp_id: Long, venue: Option[Venue]) 

Для того, чтобы сделать эту работу, я также реализован следующий конвертер:

implicit object EventToUDTValueConverter extends TypeConverter[UDTValue] { 
    def targetTypeTag = typeTag[UDTValue] 
    def convertPF = { 
    case e: Event => UDTValue.fromMap(toMap(e)) // toMap just transforms the case class into a Map[String, Any] 
    } 
} 

TypeConverter.registerConverter(EventToUDTValueConverter) 

Если Я смотрю конвертер вручную, я могу использовать его, чтобы преобразовать экземпляр Event в UDTValue, однако, при использовании sc.saveToCassandra передавая ему экземпляр RsvpResponse с соответствующими объектами, я получаю следующее сообщение об ошибке:

15/06/23 23:56:29 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) 
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Event(EVENT9136830076436652815,First event,http://www.meetup.com/first-event,Some(1435100185774)) of type class model.Event to com.datastax.spark.connector.UDTValue. 
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:42) 
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1.applyOrElse(UserDefinedType.scala:33) 
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:40) 
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1.convert(UserDefinedType.scala:31) 
    at com.datastax.spark.connector.writer.DefaultRowWriter$$anonfun$readColumnValues$2.apply(DefaultRowWriter.scala:46) 
    at com.datastax.spark.connector.writer.DefaultRowWriter$$anonfun$readColumnValues$2.apply(DefaultRowWriter.scala:43) 

Кажется, мой конвертер никогда не звонит из-за того, как библиотека соединителей обрабатывает UDTValue внутренне. Однако описанное выше решение работает для чтения данных из таблиц Cassandra (включая определенные пользователем типы). Основываясь на connector docs, я также заменил свои вложенные классы case com.datastax.spark.connector.UDTValue, которые затем исправляют описанную проблему, но прерывают чтение данных. Я не могу себе представить, что я должен определить две отдельные модели для чтения и записи данных. Или я пропущу что-то очевидное здесь?

ответ

0

Начиная с версии 1.3, нет необходимости использовать преобразователи пользовательского типа для загрузки и сохранения вложенных UDT. Просто моделируйте все с примерами классов и придерживайтесь соглашения об именах полей, и все должно быть в порядке.

+0

Я пробовал это сейчас с 1.4.0-M2, и он отлично работает. Просматривая примечания к выпуску GitHub, похоже, что это могло быть исправлено как часть https://datastax-oss.atlassian.net/browse/SPARKC-190 (1.3.0-M2) и выведено в 1.4.0- M2. –