15

Начиная с искры 2.0.1 У меня возникли вопросы. Я читал много документации, но до сих пор не удалось найти достаточное количество ответов:Spark 2.0 Dataset vs DataFrame

  • В чем разница между
    • df.select("foo")
    • df.select($"foo")
  • я правильно понимаю, что
    • myDataSet.map(foo.someVal) является типичным и wi ll не конвертировать в RDD, но оставаться в представлении DataSet/без дополнительных накладных расходов (производительность по 2,0.0)
  • все другие команды, например. выберите, .. просто синтаксический сахар. Они не являются типичными, и вместо них можно использовать карту. Как я могу использовать df.select("foo") без указания карты?
    • Почему я должен использовать UDF/UADF вместо карты (при условии, что карта остается в представлении набора данных)?
+0

Там это проект, который направлен на обеспечение большей безопасности типа для Спарк, оставаясь на эффективном пути выполнения: [typelevel/бескаркасная ] (https://github.com/typelevel/frameless) –

ответ

11
  1. Разница между df.select("foo") и df.select($"foo") является подпись. Первый берет как минимум один String, позже один ноль или более Columns. После этого нет никакой практической разницы.
  2. myDataSet.map(foo.someVal) тип чеков, но как и любая операция Dataset использует RDD объектов, а по сравнению с DataFrame операций имеется значительная накладная. Давайте рассмотрим простой пример:

    case class FooBar(foo: Int, bar: String) 
    val ds = Seq(FooBar(1, "x")).toDS 
    ds.map(_.foo).explain 
    
    == Physical Plan == 
    *SerializeFromObject [input[0, int, true] AS value#123] 
    +- *MapElements <function1>, obj#122: int 
        +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar 
         +- LocalTableScan [foo#117, bar#118] 
    

    Как вы можете видеть этот план выполнения требует доступа ко всем полям и должен DeserializeToObject.

  3. №. В общем, другие методы не являются синтаксическим сахаром и создают значительно отличающийся план выполнения. Например:

    ds.select($"foo").explain 
    
    == Physical Plan == 
    LocalTableScan [foo#117] 
    

    По сравнению с планом ранее показанного он может получить доступ колонки непосредственно. Это не столько ограничение API, сколько результат в функциональной семантике.

  4. Как я могу использовать df.select ("foo") без указания карты?

    Нет такого варианта. В то время как набранные столбцы позволяют превратить статически Dataset в другой статически типизированных Dataset:

    ds.select($"bar".as[Int]) 
    

    там не типобезопасен.Там были и другие попытки включить оптимизированные по типу операции, like typed aggregations, но этот экспериментальный API.

  5. почему я должен использовать UDF/UADF вместо карты

    Это полностью зависит от вас. Каждая распределенная структура данных в Spark предоставляет свои преимущества и недостатки (см., Например, Spark UDAF with ArrayType as bufferSchema performance issues).

Лично я считаю статически типизированных Dataset быть наименее полезным:

  • Не обеспечивают тот же диапазон оптимизаций как Dataset[Row] (хотя они разделяют формат хранения и некоторые оптимизации плана исполнения оно не в полной мере извлекает выгоду из создания кода или хранения вне кучи), ни доступа ко всем аналитическим возможностям DataFrame.

  • Типизированные преобразования - это черные ящики и эффективно создают барьер для анализа для оптимизатора. Например выборы (фильтры) не может быть толкание над типизированной трансформацией:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain 
    
    == Physical Plan == 
    *Filter (foo#133 = 1) 
    +- *Filter <function1>.apply 
        +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
         +- Exchange hashpartitioning(foo#133, 200) 
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
          +- LocalTableScan [foo#133, bar#134] 
    

    По сравнению с:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain 
    
    == Physical Plan == 
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
    +- Exchange hashpartitioning(foo#133, 200) 
        +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
         +- *Filter (foo#133 = 1) 
         +- LocalTableScan [foo#133, bar#134] 
    

    Это влияет функция, такой как предикат Pushdown или проекции магазинные.

  • Существует не так гибко, как RDDs с небольшим подмножеством поддерживаемых типов.

  • «Тип безопасности» с Encoders является спорным, когда Dataset преобразуется с использованием метода as. Поскольку форма данных не кодируется с использованием сигнатуры, компилятор может проверить только наличие Encoder.

Похожие вопросы:

 Смежные вопросы

  • Нет связанных вопросов^_^