Я использую Structured Streaming от Spark 2.1 для чтения из темы Kafka, содержимое которой двоично кодируется в формате avro.Почему Spark DataFrames не изменяет свою схему и что с ней делать?
Таким образом, после установки DataFrame
:
val messages = spark
.readStream
.format("kafka")
.options(kafkaConf)
.option("subscribe", config.getString("kafka.topic"))
.load()
Если напечатать схему этого DataFrame
(messages.printSchema()
), я получаю следующее:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- timestampType: integer (nullable = true)
Этот вопрос должен быть ортогонален проблема автоматического декодирования, но предположим, что я хочу каким-то образом преобразовать содержимое value
из сообщений DataFrame
в Dataset[BusinessObject]
, с помощью функции Array[Byte] => BusinessObject
. Например, полнота, функция может быть просто (с использованием avro4s):
case class BusinessObject(userId: String, eventId: String)
def fromAvro(bytes: Array[Byte]): BusinessObject =
AvroInputStream.binary[BusinessObject](
new ByteArrayInputStream(bytes)
).iterator.next
Конечно, as miguno says in this related question я не могу просто применить преобразование с DataFrame.map()
, потому что мне нужно, чтобы обеспечить неявный кодировщик для такого BusinessObject
.
Это может быть определено как:
implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]
Теперь выполнить карту:
val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))
Но если я запрашиваю новую схему, я получаю следующее:
root
|-- value: binary (nullable = true)
И я думаю, что это не имеет никакого смысла, так как в наборе данных должны использоваться свойства Product класса BusinessObject
case-class и получить правильные значения.
Я видел несколько примеров на Spark SQL с использованием .schema(StructType)
в читателе, но я не могу этого сделать, не только потому, что я использую readStream
, а потому, что мне действительно нужно преобразовать столбец, прежде чем он сможет работать в таких поля.
Я надеюсь сообщить движку Spark SQL, что схема набора данных transformedMessages
- это StructField
с полями класса case.
Я просто решить эту проблему, и этот вопрос не * действительно * применяется, потому что это было из-за чего-то не указано в тексте. (My BusinessObject имел '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' – ssice