reduceByKey,使用case类实例作为密钥



我正在使用Scala版本2.11.12(OpenJDK 64位服务器虚拟机,Java 1.8.0_302(开发Spark版本2.4.7-amzn-1的AWS EMR。

我想通过键来减少我的自定义用例类Item的数据集,其中键本身就是一个自定义用例类。然而,reduceByKey并没有像我预期的那样工作。

以下是两个类别:

case class Key(
name: String,
color: String
)
case class Item(
name: String,
color: String,
count: Int
) {
def key: Key = Key(name, color)
}

为了进行聚合,我在Item的伴随对象中定义了一个自定义组合函数,该函数只将计数相加:

object Item {
def combine(i1: Item, i2: Item): Item = i1.copy(count = i1.count + i2.count)
}

这是我的聚合函数:

import org.apache.spark.sql.Dataset
import spark.implicits._
def aggregate(items: Dataset[Item]): Dataset[Item] = items
.rdd
.keyBy(_.key)
.reduceByKey(Item.combine)
.map(_._2)
.toDS

现在,如果我尝试聚合。。。

val items: Dataset[Item] = spark.sparkContext.parallelize(
Seq(
Item("Square", "green", 8),
Item("Triangle", "blue", 3),
Item("Square", "green", 5),
Item("Triangle", "blue", 7)
)
).toDS
val aggregated: Dataset[Item] = aggregate(items)
aggregated.show

输出显示数据集没有减少:

+--------+-----+-----+
|    name|color|count|
+--------+-----+-----+
|  Square|green|    8|
|  Square|green|    5|
|Triangle| blue|    3|
|Triangle| blue|    7|
+--------+-----+-----+

然而,当我更改序列中4个项目的顺序时,我观察到聚合确实起作用,因此结果不一致。

如果我将密钥从案例类实例更改为

def key: Key = Key(name, color)

成为元组

def key: Tuple2[String, String] = (name, color)

聚合按预期工作,给出以下输出:

+--------+-----+-----+
|    name|color|count|
+--------+-----+-----+
|  Square|green|   13|
|Triangle| blue|   10|
+--------+-----+-----+

那么,reduceByKey通常不能(可靠地(与案例类一起工作吗?这是预期的行为吗?或者这与case类与tuple无关,而真正的原因隐藏在其他地方?我的Key类对我来说似乎很简单,所以我想,这不是一个散列或比较问题。(我可能错了。(

我还研究了这个问题reduceByKey,使用Scala对象作为键,但原因是打字错误,chrisbtk明确表示:";Spark知道如何比较两个对象,即使它们没有实现Ordered">

我总是要用元组作为密钥吗?

尝试直接使用数据集API:

拥有:

import sparkSession.implicits._
import org.apache.spark.sql.Encoders
implicit val key: Encoder[Key] = Encoders.product[Key]

你可以做:

items
.groupByKey(_.key)
.reduceGroups(Item.combine)
.map(_._2)

最新更新