SQL简单联接查询



我首先说我是SQL的新手,所以这个问题可能很琐碎。我有两张带时间戳键的桌子。对于table 1中的每个事件t_i,我希望table 2中的所有事件q使得:

q.timeStamp < t_i.timeStamp and q.timeStamp > t_{i-1}.timeStamp

也就是说,如果事件按照时间戳的顺序发生:

    q1
t1  q2
    q3
    q4
t2  q5
    q6
t3  q7

那么结果查询应该是:

t1: q1
t2: q2 q3 q4
t3: q5 q6

我将Scala与带有DataSet和DataFrame类的SQL Spark一起使用,所以无论是纯函数的"groupBy"还是SQL查询都是不错的。

首先,它并不是一个非常"简单"的查询。。。

首先,让我们用一些示例数据创建数据帧-我创建了只有时间和字符串值的小案例类,您可以用更精细的类来替换它们:

case class A(time: Long, aValue: String)
case class B(time: Long, bValue: String)
val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7"))
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3"))
val dfA: DataFrame = sqlContext.createDataFrame(tableA)
val dfB: DataFrame = sqlContext.createDataFrame(tableB)

现在-两个备选方案(概念上完全相同):

  1. 使用SQL

    dfA.registerTempTable("a")
    dfB.registerTempTable("b")
    sqlContext.sql(
      """
        |SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue)
        |FROM (
        |  SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime
        |  FROM a
        |  JOIN b ON b.time > a.time
        |  GROUP BY a.time) AS c
        |JOIN b ON c.bTime = b.time
        |GROUP BY b.time
      """.stripMargin).show()
    

    其将针对b的每个值(时间和bValue)打印a的时间列表和值列表。

  2. 使用数据帧

    import org.apache.spark.sql.functions._
    val aWithMinB: DataFrame = dfA
      .join(dfB, dfA("time") < dfB("time"))
      .groupBy(dfA("time"))
      .agg(first(dfA("aValue")), min(dfB("time")))
      .withColumnRenamed("FIRST(aValue)", "aValue")
      .withColumnRenamed("min(time)", "bTime")
    aWithMinB
      .join(dfB, dfB("time") === aWithMinB("bTime"))
      .groupBy(dfB("time"))
      .agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue")))
      .show()
    

请注意,两者都只能与Spark 1.6.0或更高版本一起使用,因为collect_list在早期版本中不存在。

更新:此处对流程的一些解释:

  • 第一个查询(SQL版本中的内部查询)旨在为表a中的所有记录创建一个"公共值",这些记录应分组为结果中的单个记录
  • 共同的价值观是什么?a中应该分组的值是b中两个连续记录之间的值。因此,它们共享b.time最小值,即大于它们的时间。换言之,对于a中的每个时间X,我们在b中寻找大于X的最小时间。对于连续两个b之间的a中的所有记录,这将是相同的值
  • 为了实现这一点,我们在b.time > a.time的条件下将ab连接起来(对于a的每个记录获得b的多个记录),然后按a.time分组(将结果缩小到a中的每个记录一个记录),为每个这样的记录取最小值b.time,并为每个a列取第一个值(取第一个并不重要——所有分组记录对a的所有列都具有相同的值!)
  • 现在我们为a中的每个记录都有了这个"额外信息",我们将其与time列上的b连接起来,并按该列分组。具有相同bTime的所有a记录都将连接到相应的b记录,我们完成了:我们对b的所有列再次使用first(同样,由于我们根据b的唯一标识符进行分组,所以所有分组记录的值都是相同的),并对a的列使用collect_list以列表形式获取所有值

相关内容

  • 没有找到相关文章

最新更新