Spark SQL可以引用前一个窗口/组的第一行吗?



我有一种事件流,看起来像这样:

Time UserId SessionId EventType EventData
1    2      A         Load      /a ...
2    1      B         Impressn  X ...
3    2      A         Impressn  Y ...
4    1      B         Load      /b ...
5    2      A         Load      /info ...
6    1      B         Load      /about ...
7    2      A         Impressn  Z ...

在实践中,用户可以在更大的时间窗口内进行多次会话,也有点击事件类型,但这里保持简单,我试图看到(页面浏览量)加载导致下一个加载,以及总的印象发生了什么。

所以,没有SQL,我已经加载了这个,按用户分组,按时间排序,并为每个会话标记每行与以前的加载信息(如果有的话)。

val outDS = logDataset.groupByKey(_.UserId)
      .flatMapGroups((_, iter) => gather(iter))

中gather按时间对iter进行排序(可能是冗余的,因为输入是按时间排序的),然后遍历序列,在每个新会话中将lastLoadData设置为null,将lastLoadData添加到每行,如果该行是Load类型,则将lastLoadData更新到该行的数据。生成如下代码:

Time UserId SessionId EventType EventData  LastLoadData
1    2      A         Load      / ...      null
2    1      B         Impressn  X ...      null
3    2      A         Impressn  Y ...      / ...
4    1      B         Load      / ...      null
5    2      A         Load      /info ...  / ...
6    1      B         Load      /about ... / ...
7    2      A         Impressn  Z ...      /info ...

允许我汇总哪些(页面浏览量)加载导致其他加载,或者在每个(页面)加载前5个印象事件。

    outDS.createOrReplaceTempView(tempTable)
    val journeyPageViews = sparkSession.sql(
      s"""SELECT lastLoadData, EventData,
         | count(distinct UserId) as users,
         | count(distinct SessionId) as sessions
         |FROM ${tempTable}
         |WHERE EventType='Load'
         |GROUP BY lastLoadData, EventData""".stripMargin)

但是,我觉得lastLoadData列的添加也可以使用Spark SQL窗口来完成,但是我挂在其中的两个部分:

  1. 如果我在UserId+SessionId按时间排序的窗口如何使其适用于所有事件,但看看以前的加载事件?(例如Impressn将获得一个新的列lastLoadData分配给这个窗口的以前的EventData)
  2. 如果我以某种方式使每个会话的Load事件的新窗口(也不确定如何),在窗口开始的Load事件(大概是"first")应该得到前一个窗口的"first"的lastLoadData;所以这可能也不是正确的方法。

您可以使用case when屏蔽null而不是Load的数据,使用last并将ignorenull设置为true来获得LastLoadData:

logDataset.createOrReplaceTempView("table")
val logDataset2 = spark.sql("""
select 
    *,
    last(case when EventType = 'Load' then EventData end, true) 
        over (partition by UserId, SessionId
              order by Time
              rows between unbounded preceding and 1 preceding) LastLoadData 
from table
order by time
""")
logDataset2.show
+----+------+---------+---------+----------+------------+
|Time|UserId|SessionId|EventType| EventData|LastLoadData|
+----+------+---------+---------+----------+------------+
|   1|     2|        A|     Load|    /a ...|        null|
|   2|     1|        B| Impressn|     X ...|        null|
|   3|     2|        A| Impressn|     Y ...|      /a ...|
|   4|     1|        B|     Load|    /b ...|        null|
|   5|     2|        A|     Load| /info ...|      /a ...|
|   6|     1|        B|     Load|/about ...|      /b ...|
|   7|     2|        A| Impressn|     Z ...|   /info ...|
+----+------+---------+---------+----------+------------+

相关内容

  • 没有找到相关文章

最新更新