1

Я использую Structured Streaming от Spark 2.1 для чтения из темы Kafka, содержимое которой двоично кодируется в формате avro.Почему Spark DataFrames не изменяет свою схему и что с ней делать?

Таким образом, после установки DataFrame:

val messages = spark 
    .readStream 
    .format("kafka") 
    .options(kafkaConf) 
    .option("subscribe", config.getString("kafka.topic")) 
    .load() 

Если напечатать схему этого DataFrame (messages.printSchema()), я получаю следующее:

root 
|-- key: binary (nullable = true) 
|-- value: binary (nullable = true) 
|-- topic: string (nullable = true) 
|-- partition: integer (nullable = true) 
|-- offset: long (nullable = true) 
|-- timestamp: long (nullable = true) 
|-- timestampType: integer (nullable = true) 

Этот вопрос должен быть ортогонален проблема автоматического декодирования, но предположим, что я хочу каким-то образом преобразовать содержимое value из сообщений DataFrame в Dataset[BusinessObject], с помощью функции Array[Byte] => BusinessObject. Например, полнота, функция может быть просто (с использованием avro4s):

case class BusinessObject(userId: String, eventId: String) 

def fromAvro(bytes: Array[Byte]): BusinessObject = 
    AvroInputStream.binary[BusinessObject](
     new ByteArrayInputStream(bytes) 
    ).iterator.next 

Конечно, as miguno says in this related question я не могу просто применить преобразование с DataFrame.map(), потому что мне нужно, чтобы обеспечить неявный кодировщик для такого BusinessObject.

Это может быть определено как:

implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject] 

Теперь выполнить карту:

val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value"))) 

Но если я запрашиваю новую схему, я получаю следующее:

root 
|-- value: binary (nullable = true) 

И я думаю, что это не имеет никакого смысла, так как в наборе данных должны использоваться свойства Product класса BusinessObject case-class и получить правильные значения.

Я видел несколько примеров на Spark SQL с использованием .schema(StructType) в читателе, но я не могу этого сделать, не только потому, что я использую readStream, а потому, что мне действительно нужно преобразовать столбец, прежде чем он сможет работать в таких поля.

Я надеюсь сообщить движку Spark SQL, что схема набора данных transformedMessages - это StructField с полями класса case.

ответ

1

Я бы сказал, что вы получаете именно то, о чем просите. Как I already explained todayEncoders.kryo генерирует blob с сериализованным объектом. Его внутренняя структура непрозрачна для механизма SQL и не может быть доступна без десериализации объекта. Настолько эффективно, что ваш код делает, принимает один формат сериализации и заменяет его другим.

Еще одна проблема заключается в том, что вы пытаетесь смешивать динамически типизированный DataFrame (Dataset[Row]) со статически типизированным объектом. Исключение UDT API Spark SQL не работает так. Либо вы используете статически Dataset, либо DataFrame с структурой объекта, закодированной с использованием иерархии struct.

Хорошая новость - это простые типы продуктов, такие как BusinessObject, должны работать отлично, без необходимости неуклюжими Encoders.kryo.Просто пропустить Kryo определение датчика и обязательно импортировать неявные кодеров:

import spark.implicits._ 
+1

Я просто решить эту проблему, и этот вопрос не * действительно * применяется, потому что это было из-за чего-то не указано в тексте. (My BusinessObject имел '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' – ssice