Spark 2.0 Dataset vs DataFrame



从spark 2.0.1开始,我有一些问题。我读了很多文档,但到目前为止还没有找到足够的答案:

  • 有什么区别
    • df.select("foo")
    • df.select($"foo")
  • 我理解正确吗
    • myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD,但保持在数据集表示/没有额外的开销(2.0.0的性能方面)
  • 所有其他命令,例如select,…都是语法糖。它们不是类型安全的,可以使用映射来代替。如果没有map语句,我怎么能保证df.select("foo")类型安全?
    • 为什么我应该使用UDF/UADF而不是地图(假设地图停留在数据集表示)?
  1. df.select("foo")df.select($"foo")的区别是签名。前者至少使用一个String,后者至少使用零个或多个Columns。除此之外没有实际的区别。
  2. myDataSet.map(foo.someVal)类型检查,但由于任何Dataset操作都使用RDD的对象,并且与DataFrame操作相比,存在显着的开销。让我们看一个简单的例子:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    正如你所看到的,这个执行计划需要访问所有字段,并且必须访问DeserializeToObject

  3. 。一般来说,其他方法不是语法糖,并且生成明显不同的执行计划。例如:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    与之前所示的计划相比,它可以直接访问列。这与其说是API的限制,不如说是操作语义差异的结果。

  4. 如果没有map语句,我怎么能df.select("foo")类型安全?

    没有这样的选项。而类型化列允许您将静态Dataset转换为另一个静态Dataset:

    ds.select($"bar".as[Int])
    

    没有类型安全。还有其他一些尝试包括类型安全的优化操作,如类型聚合,但这个实验性的API。

  5. 为什么我应该使用UDF/UADF而不是map

    这完全取决于你。Spark中的每个分布式数据结构都有自己的优点和缺点(参见例如Spark UDAF使用ArrayType作为bufferSchema的性能问题)。

我个人认为静态类型的Dataset是最没用的:

  • 不提供与Dataset[Row]相同的优化范围(尽管它们共享存储格式和一些执行计划优化,但不能完全受益于代码生成或堆外存储),也不能访问DataFrame的所有分析功能。

  • 类型化转换是黑盒,有效地为优化器创建了分析障碍。例如,不能将选择(过滤器)推入类型转换:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    
    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    
    :相比

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    
    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134] 
    

    这会影响谓词下推或投影下推等功能。

  • 不像RDDs那样灵活,只有一小部分类型本地支持

  • 当使用as方法转换Dataset时,
  • "类型安全"与Encoders存在争议。因为数据形状没有使用签名编码,编译器只能验证Encoder的存在。

相关问题:

  • 在Scala中使用Spark数据集执行类型化连接
  • Spark 2.0 DataSets groupByKey anddivide operation and type safety

Spark Dataset比Spark Dataframe更强大。一个小例子——你只能创建RowTuple或任何原始数据类型的Dataframe,但Dataset也可以创建任何非原始数据类型的Dataset。也就是说,你可以直接创建对象类型的Dataset

,

case class Employee(id:Int,name:String)
Dataset[Employee]   // is valid
Dataframe[Employee] // is invalid

DATAFRAME: DATAFRAME是一个抽象,允许数据的模式视图。

case class Person(name: String, age: Int, address: String)

已定义类Person

scala> val df = List (Person (" Sumanth ", 23, " BNG ")

DATAFRAME VS DATASET

DATASET: Data Set是Dataframe API的扩展,是最新的抽象,它试图同时提供RDD和Dataframe的优点。

相关内容

  • 没有找到相关文章