我使用SparkSQL 2.2.0从Cassandra加载数据并对其进行索引Elasticsearch。我拥有的数据包括客户(第一个表people
)和订单(第二个表orders
)
表格订单有一列person_id
,它指向相应的客户
我需要查询(稍后在Elasticsearch中索引)people
表和orders
,这样我就可以为每个客户获得她购买的订单数量
我找到的最简单的方法是将这两个表读取到org.apache.spark.sql.Dataset<Row>
中,并在person_id
列上进行联接。然后我CCD_ 8
这给了我一个包含两列的数据集:person_id
和count
,我必须将它们与people
表连接起来,这样我就可以与其他人的数据进行计数。
Dataset<Row> peopleWithOrders = people.join(orders, people.col("id").equalTo(orders.col("person_id")), "left_outer");
Dataset<Row> peopleOrdersCounts = peopleWithOrders.groupBy("id").count().withColumnRenamed("id", "personId");
Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer")
.withColumnRenamed("count", "nbrOfOrders")
.select("id", "name", "birthDate", "nbrOfOrders");
people
表有1_000_000行,orders
表有2_500_000行。每个客户有2到3个订单
我使用的是MAC Book pro,带有2,2 GHz Intel Core i7处理器和16 GB 1600 MHz DDR3内存。Cassandra、Spark 2.2 master和(单个)worker都在同一台机器上
这3个加入需要15到20秒
我的问题是:是否还有提高绩效的空间。Do窗口聚合函数有好处,因为我在日志中看到了ShuffleMapTask。
提前感谢
我认为第一步是不必要的。你可以这样做:
Dataset<Row> peopleOrdersCounts = orders.groupBy("person_id").count();
Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer")
.withColumnRenamed("count", "nbrOfOrders")
.select("id", "name", "birthDate", "nbrOfOrders");
我希望这能有所帮助。