大家好,首先,根据标题,有人可能会说这个问题已经回答了,但我的观点是比较ReduceBykey,GroupBykey的性能,特别是数据集和RDD API。 我在许多帖子中看到,ReduceBykey方法的性能比GroupByKey更有效,当然我同意这一点。尽管如此,我还是有点困惑,如果我们使用数据集或RDD,我无法弄清楚这些方法的行为方式。每个案例应该使用哪一个?
我将尝试更具体,因此我将提供我的问题以及我的解决方案以及工作代码,我正在等待您尽早方便时向我提出改进建议。
+---+------------------+-----+
|id |Text1 |Text2|
+---+------------------+-----+
|1 |one,two,three |one |
|2 |four,one,five |six |
|3 |seven,nine,one,two|eight|
|4 |two,three,five |five |
|5 |six,five,one |seven|
+---+------------------+-----+
这里的重点是检查第二个列的每一行上是否包含第三个列,然后收集它们的所有 ID。例如,第三列"一"的单词出现在第二列的句子中,ID 为 1、5、2、3。
+-----+------------+
|Text2|Set |
+-----+------------+
|seven|[3] |
|one |[1, 5, 2, 3]|
|six |[5] |
|five |[5, 2, 4] |
+-----+------------+
这是我的工作代码
List<Row> data = Arrays.asList(
RowFactory.create(1, "one,two,three", "one"),
RowFactory.create(2, "four,one,five", "six"),
RowFactory.create(3, "seven,nine,one,two", "eight"),
RowFactory.create(4, "two,three,five", "five"),
RowFactory.create(5, "six,five,one", "seven")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("Text1", DataTypes.StringType, false, Metadata.empty()),
new StructField("Text2", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
df.show(false);
Dataset<Row> df1 = df.select("id", "Text1")
.crossJoin(df.select("Text2"))
.filter(col("Text1").contains(col("Text2")))
.orderBy(col("Text2"));
df1.show(false);
Dataset<Row> df2 = df1
.groupBy("Text2")
.agg(collect_set(col("id")).as("Set"));
df2.show(false);
我的问题在 3 个子序列中详细说明:
- 为了提高性能,我是否需要在RDD中转换数据集并使ReduceBykey而不是Dataset groupby?
- 我应该使用哪一个,为什么?数据集或RDD
- 如果您能提供一种更有效的替代解决方案,我将不胜感激
TL;DR两者都不好,但如果您正在使用Dataset
请留在Dataset
.
如果与合适的功能一起使用,Dataset.groupBy
的行为就像reduceByKey
一样。不幸的是,如果重复项的数量很少,collect_set
的行为与groupByKey
非常相似。用reduceByKey
重写它不会改变任何事情。
如果您能提供一种更有效的替代解决方案,我将不胜感激
你能做的最好的事情是删除crossJoin
:
val df = Seq((1, "one,two,three", "one"),
(2, "four,one,five", "six"),
(3, "seven,nine,one,two", "eight"),
(4, "two,three,five", "five"),
(5, "six,five,one", "seven")).toDF("id", "text1", "text2")
df.select(col("id"), explode(split(col("Text1"), ",")).alias("w"))
.join(df.select(col("Text2").alias("w")), Seq("w"))
.groupBy("w")
.agg(collect_set(col("id")).as("Set")).show
+-----+------------+
| w| Set|
+-----+------------+
|seven| [3]|
| one|[1, 5, 2, 3]|
| six| [5]|
| five| [5, 2, 4]|
+-----+------------+