我首先说我是SQL的新手,所以这个问题可能很琐碎。我有两张带时间戳键的桌子。对于table 1
中的每个事件t_i
,我希望table 2
中的所有事件q
使得:
q.timeStamp < t_i.timeStamp and q.timeStamp > t_{i-1}.timeStamp
也就是说,如果事件按照时间戳的顺序发生:
q1
t1 q2
q3
q4
t2 q5
q6
t3 q7
那么结果查询应该是:
t1: q1
t2: q2 q3 q4
t3: q5 q6
我将Scala与带有DataSet和DataFrame类的SQL Spark一起使用,所以无论是纯函数的"groupBy"还是SQL查询都是不错的。
首先,它并不是一个非常"简单"的查询。。。
首先,让我们用一些示例数据创建数据帧-我创建了只有时间和字符串值的小案例类,您可以用更精细的类来替换它们:
case class A(time: Long, aValue: String)
case class B(time: Long, bValue: String)
val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7"))
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3"))
val dfA: DataFrame = sqlContext.createDataFrame(tableA)
val dfB: DataFrame = sqlContext.createDataFrame(tableB)
现在-两个备选方案(概念上完全相同):
使用SQL:
dfA.registerTempTable("a") dfB.registerTempTable("b") sqlContext.sql( """ |SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue) |FROM ( | SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime | FROM a | JOIN b ON b.time > a.time | GROUP BY a.time) AS c |JOIN b ON c.bTime = b.time |GROUP BY b.time """.stripMargin).show()
其将针对b的每个值(时间和bValue)打印a的时间列表和值列表。
使用数据帧:
import org.apache.spark.sql.functions._ val aWithMinB: DataFrame = dfA .join(dfB, dfA("time") < dfB("time")) .groupBy(dfA("time")) .agg(first(dfA("aValue")), min(dfB("time"))) .withColumnRenamed("FIRST(aValue)", "aValue") .withColumnRenamed("min(time)", "bTime") aWithMinB .join(dfB, dfB("time") === aWithMinB("bTime")) .groupBy(dfB("time")) .agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue"))) .show()
请注意,两者都只能与Spark 1.6.0或更高版本一起使用,因为collect_list
在早期版本中不存在。
更新:此处对流程的一些解释:
- 第一个查询(SQL版本中的内部查询)旨在为表
a
中的所有记录创建一个"公共值",这些记录应分组为结果中的单个记录 - 共同的价值观是什么?
a
中应该分组的值是b
中两个连续记录之间的值。因此,它们共享b.time
的最小值,即大于它们的时间。换言之,对于a
中的每个时间X,我们在b
中寻找大于X的最小时间。对于连续两个b
之间的a
中的所有记录,这将是相同的值 - 为了实现这一点,我们在
b.time > a.time
的条件下将a
与b
连接起来(对于a
的每个记录获得b
的多个记录),然后按a.time
分组(将结果缩小到a
中的每个记录一个记录),为每个这样的记录取最小值b.time
,并为每个a
列取第一个值(取第一个并不重要——所有分组记录对a
的所有列都具有相同的值!) - 现在我们为
a
中的每个记录都有了这个"额外信息",我们将其与time
列上的b
连接起来,并按该列分组。具有相同bTime
的所有a
记录都将连接到相应的b
记录,我们完成了:我们对b
的所有列再次使用first
(同样,由于我们根据b
的唯一标识符进行分组,所以所有分组记录的值都是相同的),并对a
的列使用collect_list
以列表形式获取所有值