从spark 2.0.1开始,我有一些问题。我读了很多文档,但到目前为止还没有找到足够的答案:
- 有什么区别
-
df.select("foo")
-
df.select($"foo")
-
- 我理解正确吗
-
myDataSet.map(foo.someVal)
是类型安全的,不会转换为RDD
,但保持在数据集表示/没有额外的开销(2.0.0的性能方面)
-
- 所有其他命令,例如select,…都是语法糖。它们不是类型安全的,可以使用映射来代替。如果没有map语句,我怎么能保证
df.select("foo")
类型安全?- 为什么我应该使用UDF/UADF而不是地图(假设地图停留在数据集表示)?
-
df.select("foo")
和df.select($"foo")
的区别是签名。前者至少使用一个String
,后者至少使用零个或多个Columns
。除此之外没有实际的区别。 -
myDataSet.map(foo.someVal)
类型检查,但由于任何Dataset
操作都使用RDD
的对象,并且与DataFrame
操作相比,存在显着的开销。让我们看一个简单的例子:case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain
== Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements <function1>, obj#122: int +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar +- LocalTableScan [foo#117, bar#118]
正如你所看到的,这个执行计划需要访问所有字段,并且必须访问
DeserializeToObject
。 -
。一般来说,其他方法不是语法糖,并且生成明显不同的执行计划。例如:
ds.select($"foo").explain
== Physical Plan == LocalTableScan [foo#117]
与之前所示的计划相比,它可以直接访问列。这与其说是API的限制,不如说是操作语义差异的结果。
-
如果没有map语句,我怎么能df.select("foo")类型安全?
没有这样的选项。而类型化列允许您将静态
Dataset
转换为另一个静态Dataset
:ds.select($"bar".as[Int])
没有类型安全。还有其他一些尝试包括类型安全的优化操作,如类型聚合,但这个实验性的API。
-
为什么我应该使用UDF/UADF而不是map
这完全取决于你。Spark中的每个分布式数据结构都有自己的优点和缺点(参见例如Spark UDAF使用ArrayType作为bufferSchema的性能问题)。
我个人认为静态类型的Dataset
是最没用的:
-
不提供与
Dataset[Row]
相同的优化范围(尽管它们共享存储格式和一些执行计划优化,但不能完全受益于代码生成或堆外存储),也不能访问DataFrame
的所有分析功能。 -
类型化转换是黑盒,有效地为优化器创建了分析障碍。例如,不能将选择(过滤器)推入类型转换:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
:相比== Physical Plan == *Filter (foo#133 = 1) +- *Filter <function1>.apply +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- LocalTableScan [foo#133, bar#134]
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- *Filter (foo#133 = 1) +- LocalTableScan [foo#133, bar#134]
这会影响谓词下推或投影下推等功能。
-
不像
RDDs
那样灵活,只有一小部分类型本地支持 当使用 - "类型安全"与
Encoders
存在争议。因为数据形状没有使用签名编码,编译器只能验证Encoder
的存在。
as
方法转换Dataset
时,相关问题:
- 在Scala中使用Spark数据集执行类型化连接
- Spark 2.0 DataSets groupByKey anddivide operation and type safety
Spark Dataset
比Spark Dataframe
更强大。一个小例子——你只能创建Row
、Tuple
或任何原始数据类型的Dataframe
,但Dataset
也可以创建任何非原始数据类型的Dataset
。也就是说,你可以直接创建对象类型的Dataset
。
,
case class Employee(id:Int,name:String)
Dataset[Employee] // is valid
Dataframe[Employee] // is invalid
DATAFRAME: DATAFRAME是一个抽象,允许数据的模式视图。
case class Person(name: String, age: Int, address: String)
已定义类Person
scala> val df = List (Person (" Sumanth ", 23, " BNG ")
DATAFRAME VS DATASET
DATASET: Data Set是Dataframe API的扩展,是最新的抽象,它试图同时提供RDD和Dataframe的优点。