我喜欢Spark数据集,因为它们在编译时给我分析错误和语法错误,还允许我使用getter而不是硬编码的名称/数字。大多数计算可以用Dataset的高级api完成。例如,通过访问Dataset类型对象的agg、select、sum、avg、map、filter或groupBy操作要比使用RDD行的数据字段简单得多。
然而,这里缺少连接操作,我读到我可以做这样的连接
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
但这不是我想要的,因为我更喜欢通过case类接口来做,所以更像这样
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
目前最好的替代方法似乎是在case类旁边创建一个对象,并让这个函数为我提供正确的字符串列名。因此,我将使用第一行代码,但使用函数而不是硬编码的列名。但这感觉不够优雅…
谁能告诉我其他的选择吗?我们的目标是从实际的列名中抽象出来的,最好是通过case类的getter来工作。我正在使用Spark 1.6.1和Scala 2.10
观察
Spark SQL只能在连接条件为相等操作符的情况下优化连接。这意味着我们可以分别考虑相等连接和非相等连接。
等联结
Equijoin可以通过将Datasets
映射到(键,值)元组,基于键执行连接,并重塑结果,以类型安全的方式实现:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
Non-equijoin
可以用关系代数算子表示为R
火花2.0
使能crossJoin
和使用joinWith
与平凡相等的谓词:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
火花2.1
使用crossJoin
方法:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
例子
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
指出应该注意的是,这些方法与直接的
joinWith
应用程序在质量上是不同的,并且需要昂贵的DeserializeToObject
/SerializeFromObject
转换(与直接的joinWith
可以对数据使用逻辑操作相比)。这类似于Spark 2.0 Dataset vs DataFrame中描述的行为。
如果你不局限于Spark SQL API,
frameless
为Datasets
提供了有趣的类型安全扩展(截至目前它只支持Spark 2.0):import frameless.TypedDataset val typedPoints1 = TypedDataset.create(points1) val typedPoints2 = TypedDataset.create(points2) typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
Dataset
API在1.6中不稳定,所以我认为在那里使用它没有意义。当然这种设计和描述性的名称是没有必要的。您可以很容易地使用类型类将这些方法隐式地添加到
Dataset
中,并且与内置签名没有冲突,因此两者都可以称为joinWith
。
另外,不类型安全的Spark API的另一个更大的问题是,当你加入两个Datasets
时,它会给你一个DataFrame
。然后你失去了原始两个数据集的类型。
val a: Dataset[A]
val b: Dataset[B]
val joined: Dataframe = a.join(b)
// what would be great is
val joined: Dataset[C] = a.join(b)(implicit func: (A, B) => C)