我正在使用Scala版本2.11.12(OpenJDK 64位服务器虚拟机,Java 1.8.0_302(开发Spark版本2.4.7-amzn-1的AWS EMR。
我想通过键来减少我的自定义用例类Item
的数据集,其中键本身就是一个自定义用例类。然而,reduceByKey
并没有像我预期的那样工作。
以下是两个类别:
case class Key(
name: String,
color: String
)
case class Item(
name: String,
color: String,
count: Int
) {
def key: Key = Key(name, color)
}
为了进行聚合,我在Item
的伴随对象中定义了一个自定义组合函数,该函数只将计数相加:
object Item {
def combine(i1: Item, i2: Item): Item = i1.copy(count = i1.count + i2.count)
}
这是我的聚合函数:
import org.apache.spark.sql.Dataset
import spark.implicits._
def aggregate(items: Dataset[Item]): Dataset[Item] = items
.rdd
.keyBy(_.key)
.reduceByKey(Item.combine)
.map(_._2)
.toDS
现在,如果我尝试聚合。。。
val items: Dataset[Item] = spark.sparkContext.parallelize(
Seq(
Item("Square", "green", 8),
Item("Triangle", "blue", 3),
Item("Square", "green", 5),
Item("Triangle", "blue", 7)
)
).toDS
val aggregated: Dataset[Item] = aggregate(items)
aggregated.show
输出显示数据集没有减少:
+--------+-----+-----+
| name|color|count|
+--------+-----+-----+
| Square|green| 8|
| Square|green| 5|
|Triangle| blue| 3|
|Triangle| blue| 7|
+--------+-----+-----+
然而,当我更改序列中4个项目的顺序时,我观察到聚合确实起作用,因此结果不一致。
如果我将密钥从案例类实例更改为
def key: Key = Key(name, color)
成为元组
def key: Tuple2[String, String] = (name, color)
聚合按预期工作,给出以下输出:
+--------+-----+-----+
| name|color|count|
+--------+-----+-----+
| Square|green| 13|
|Triangle| blue| 10|
+--------+-----+-----+
那么,reduceByKey
通常不能(可靠地(与案例类一起工作吗?这是预期的行为吗?或者这与case类与tuple无关,而真正的原因隐藏在其他地方?我的Key
类对我来说似乎很简单,所以我想,这不是一个散列或比较问题。(我可能错了。(
我还研究了这个问题reduceByKey,使用Scala对象作为键,但原因是打字错误,chrisbtk明确表示:";Spark知道如何比较两个对象,即使它们没有实现Ordered">
我总是要用元组作为密钥吗?
尝试直接使用数据集API:
拥有:
import sparkSession.implicits._
import org.apache.spark.sql.Encoders
implicit val key: Encoder[Key] = Encoders.product[Key]
你可以做:
items
.groupByKey(_.key)
.reduceGroups(Item.combine)
.map(_._2)