SparkSQL正在连接父/子数据集



我使用SparkSQL 2.2.0从Cassandra加载数据并对其进行索引Elasticsearch。我拥有的数据包括客户(第一个表people)和订单(第二个表orders)
表格订单有一列person_id,它指向相应的客户
我需要查询(稍后在Elasticsearch中索引)people表和orders,这样我就可以为每个客户获得她购买的订单数量
我找到的最简单的方法是将这两个表读取到org.apache.spark.sql.Dataset<Row>中,并在person_id列上进行联接。然后我CCD_ 8
这给了我一个包含两列的数据集:person_idcount,我必须将它们与people表连接起来,这样我就可以与其他人的数据进行计数。

Dataset<Row> peopleWithOrders = people.join(orders, people.col("id").equalTo(orders.col("person_id")), "left_outer");
Dataset<Row> peopleOrdersCounts = peopleWithOrders.groupBy("id").count().withColumnRenamed("id", "personId");
Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer")
.withColumnRenamed("count", "nbrOfOrders")
.select("id", "name", "birthDate", "nbrOfOrders");

people表有1_000_000行,orders表有2_500_000行。每个客户有2到3个订单
我使用的是MAC Book pro,带有2,2 GHz Intel Core i7处理器和16 GB 1600 MHz DDR3内存。Cassandra、Spark 2.2 master和(单个)worker都在同一台机器上
这3个加入需要15到20秒
我的问题是:是否还有提高绩效的空间。Do窗口聚合函数有好处,因为我在日志中看到了ShuffleMapTask。

提前感谢

我认为第一步是不必要的。你可以这样做:

Dataset<Row> peopleOrdersCounts = orders.groupBy("person_id").count();
Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer")
.withColumnRenamed("count", "nbrOfOrders")
.select("id", "name", "birthDate", "nbrOfOrders");

我希望这能有所帮助。

相关内容

  • 没有找到相关文章