如何在某些情况下在Spark数据帧中创建新的列'count'



我有一个关于Id,targetIP,Time列连接日志的DataFrame。这个DataFrame中的每条记录都是到一个系统的连接事件。Id表示本次连接,targetIP表示本次目标IP地址,time表示连接时间。与价值观:

targetIP3192.163.0.1

所以,你基本上需要的是一个窗口函数。

让我们从你的初始数据开始

import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class Event(ID: Int, Time: Int, targetIP: String)
val events = Seq(
Event(1, 1, "192.163.0.1"),
Event(2, 2, "192.163.0.2"),
Event(3, 3, "192.163.0.1"),
Event(4, 5, "192.163.0.1"),
Event(5, 6, "192.163.0.2"),
Event(6, 7, "192.163.0.2"),
Event(7, 8, "192.163.0.2")
).toDS()

现在我们需要定义一个窗口函数本身

val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)

现在是最有趣的部分:如何数窗户上的东西?没有简单的方法,所以我们将执行以下操作

  1. 将所有targetIp聚合到列表
  2. 过滤列表,只查找需要的ip
  3. 列表的计数大小
val df = events
.withColumn("tmp", collect_list($"targetIp").over(timeWindow))
.withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
.drop($"tmp")

结果将包含一个新列"count"这是我们需要的!

乌利希期刊指南:

有一个更短的版本,没有聚合,由@blackbishop编写,

val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
.withColumn("count", count("*").over(timeWindow) - lit(1))
.explain(true)

您可以使用count在窗口范围- 2和当前行之间,以获得IP在最近2个时间单位的计数。

使用Spark SQL你可以这样做:

df.createOrReplaceTempView("connection_logs")
df1 = spark.sql("""
SELECT  *,
COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time 
RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
) -1 AS count
FROM    connection_logs
ORDER BY ID
""")
df1.show()
#+---+----+-----------+-----+
#| ID|Time|   targetIP|count|
#+---+----+-----------+-----+
#|  1|   1|192.163.0.1|    0|
#|  2|   2|192.163.0.2|    0|
#|  3|   3|192.163.0.1|    1|
#|  4|   5|192.163.0.1|    1|
#|  5|   6|192.163.0.2|    0|
#|  6|   7|192.163.0.2|    1|
#|  7|   8|192.163.0.2|    2|
#+---+----+-----------+-----+

或者使用DataFrame API

from pyspark.sql import Window
from pyspark.sql import functions as F
time_unit = lambda x: x
w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)
df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")
df1.show()