我已经超过2个表,我希望加入它们并创建一个表格,查询将更快。
table-1
---------------
user | activityId
---------------
user1 | 123
user2 | 123
user3 | 123
user4 | 123
user5 | 123
---------------
table-2
---------------------------------
user | activityId | event-1-time
---------------------------------
user2 | 123 | 1001
user2 | 123 | 1002
user3 | 123 | 1003
user5 | 123 | 1004
---------------------------------
table-3
---------------------------------
user | activityId | event-2-time
---------------------------------
user2 | 123 | 10001
user5 | 123 | 10002
---------------------------------
与表2&表-3将产生结果:
加入data
--------------------------------------------------------------------
user | activityId | event-1 | event-1-time | event-2 | event-2-time
--------------------------------------------------------------------
user1 | 123 | 0 | null | 0 | null
user2 | 123 | 1 | 1001 | 1 | 10001
user2 | 123 | 1 | 1002 | 1 | 10001
user3 | 123 | 1 | 1003 | 0 | null
user4 | 123 | 0 | null | 0 | null
user5 | 123 | 1 | 1004 | 1 | 10002
--------------------------------------------------------------------
我希望删除与事件2一起引入的冗余,即事件2仅出现一次,但自事件1出现两次以来报告了两次。
换句话说,用户和ActivationId分组记录应在每个表级别都不同。
我想要以下输出。我不在乎关系(事件2事件2(。是否有任何允许自定义加入并实现此行为
的东西user | activityId | event-1 | event-1-time | event-2 | event-2-time
--------------------------------------------------------------------
user1 | 123 | 0 | null | 0 | null
user2 | 123 | 1 | 1001 | 1 | 10001
user2 | 123 | 1 | 1002 | 0 | null
user3 | 123 | 1 | 1003 | 0 | null
user4 | 123 | 0 | null | 0 | null
user5 | 123 | 1 | 1004 | 1 | 10002
--------------------------------------------------------------------
编辑:
我正在使用Scala连接这些表。查询使用:
val joined = table1.join(table2, Seq("user","activityId"), "left").join(table3, Seq("user","activityId"), "left")
joined.select(table1("user"), table1("activityId"), when(table2("activityId").isNull,0).otherwise(1) as "event-1",
table2("timestamp") as "event-1-time"), when(table3("activityId").isNull, 0).otherwise(1) as "event-2", table3("timestamp") as "event-2-time").show
您应该使用 row index 通过activityId
订购,然后在outer join
Process
import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("user").orderBy("activityId")
import org.apache.spark.sql.functions._
val tempTable1 = table1.withColumn("rowNumber", row_number().over(windowSpec))
val tempTable2 = table2.withColumn("rowNumber", row_number().over(windowSpec)).withColumn("event-1", lit(1))
val tempTable3 = table3.withColumn("rowNumber", row_number().over(windowSpec)).withColumn("event-2", lit(1))
tempTable1
.join(tempTable2, Seq("user", "activityId", "rowNumber"), "outer")
.join(tempTable3, Seq("user", "activityId", "rowNumber"), "outer")
.drop("rowNumber")
.na.fill(0)
您应该将所需的输出dataframe
作为
+-----+----------+------------+-------+------------+-------+
|user |activityId|event-1-time|event-1|event-2-time|event-2|
+-----+----------+------------+-------+------------+-------+
|user1|123 |null |0 |null |0 |
|user2|123 |1002 |1 |null |0 |
|user2|123 |1001 |1 |10001 |1 |
|user3|123 |1003 |1 |null |0 |
|user4|123 |null |0 |null |0 |
|user5|123 |1004 |1 |10002 |1 |
+-----+----------+------------+-------+------------+-------+
以下是要求的代码
from pyspark.sql import Row
ll = [('test',123),('test',123),('test',123),('test',123)]
rdd = sc.parallelize(ll)
test1 = rdd.map(lambda x: Row(user=x[0], activityid=int(x[1])))
test1_df = sqlContext.createDataFrame(test1)
mm = [('test',123,1001),('test',123,1002),('test',123,1003),('test',123,1004)]
rdd1 = sc.parallelize(mm)
test2 = rdd1.map(lambda x: Row(user=x[0],
activityid=int(x[1]),event_time_1=int(x[2])))
test2_df = sqlContext.createDataFrame(test2)
nn = [('test',123,10001),('test',123,10002)]
rdd2 = sc.parallelize(nn)
test3 = rdd2.map(lambda x: Row(user=x[0],
activityid=int(x[1]),event_time_2=int(x[2])))
test3_df = sqlContext.createDataFrame(test3)
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import dense_rank, rank
n = Window.partitionBy(test2_df.user,test2_df.activityid).orderBy(test2_df.event_time_1)
int2_df = test2_df.select("user","activityid","event_time_1",rank().over(n).alias("col_rank")).filter('col_rank = 1')
o = Window.partitionBy(test3_df.user,test3_df.activityid).orderBy(test3_df.event_time_2)
int3_df = test3_df.select("user","activityid","event_time_2",rank().over(o).alias("col_rank")).filter('col_rank = 1')
test1_df.distinct().join(int2_df,["user","activityid"],"leftouter").join(int3_df,["user","activityid"],"leftouter").show(10)
+----+----------+------------+--------+------------+--------+
|user|activityid|event_time_1|col_rank|event_time_2|col_rank|
+----+----------+------------+--------+------------+--------+
|test| 123| 1001| 1| 10001| 1|
+----+----------+------------+--------+------------+--------+