我有一个关于Id
,targetIP
,Time
列连接日志的DataFrame。这个DataFrame中的每条记录都是到一个系统的连接事件。Id表示本次连接,targetIP
表示本次目标IP地址,time表示连接时间。与价值观:
所以,你基本上需要的是一个窗口函数。
让我们从你的初始数据开始
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class Event(ID: Int, Time: Int, targetIP: String)
val events = Seq(
Event(1, 1, "192.163.0.1"),
Event(2, 2, "192.163.0.2"),
Event(3, 3, "192.163.0.1"),
Event(4, 5, "192.163.0.1"),
Event(5, 6, "192.163.0.2"),
Event(6, 7, "192.163.0.2"),
Event(7, 8, "192.163.0.2")
).toDS()
现在我们需要定义一个窗口函数本身
val timeWindow = Window.orderBy($"Time").rowsBetween(-2, -1)
现在是最有趣的部分:如何数窗户上的东西?没有简单的方法,所以我们将执行以下操作
- 将所有targetIp聚合到列表
- 过滤列表,只查找需要的ip
- 列表的计数大小
val df = events
.withColumn("tmp", collect_list($"targetIp").over(timeWindow))
.withColumn("count", size(expr("filter(tst, x -> x == targetIp)")))
.drop($"tmp")
结果将包含一个新列"count"这是我们需要的!
乌利希期刊指南:
有一个更短的版本,没有聚合,由@blackbishop编写,
val timeWindow = Window.partitionBy($"targetIP").orderBy($"Time").rangeBetween(-2, Window.currentRow)
val df = events
.withColumn("count", count("*").over(timeWindow) - lit(1))
.explain(true)
您可以使用count
在窗口范围- 2和当前行之间,以获得IP在最近2个时间单位的计数。
使用Spark SQL你可以这样做:
df.createOrReplaceTempView("connection_logs")
df1 = spark.sql("""
SELECT *,
COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time
RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
) -1 AS count
FROM connection_logs
ORDER BY ID
""")
df1.show()
#+---+----+-----------+-----+
#| ID|Time| targetIP|count|
#+---+----+-----------+-----+
#| 1| 1|192.163.0.1| 0|
#| 2| 2|192.163.0.2| 0|
#| 3| 3|192.163.0.1| 1|
#| 4| 5|192.163.0.1| 1|
#| 5| 6|192.163.0.2| 0|
#| 6| 7|192.163.0.2| 1|
#| 7| 8|192.163.0.2| 2|
#+---+----+-----------+-----+
或者使用DataFrame API
from pyspark.sql import Window
from pyspark.sql import functions as F
time_unit = lambda x: x
w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)
df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")
df1.show()
相关内容
- 如何在sqs上指定使用count元素创建资源
- 多线程问题:当count变为非零时创建文件,当count变为零时删除文件
- DAX度量,从已创建的度量计算Sum和Count
- 我有一个问题我创建了一个计数器应用程序,然后我添加了一个输入字段.但是我希望我在字段中输入的内容能够反映在count部分
- 基于count遍历XML并使用spark scala创建ArrayString
- Pandas:创建count cumsum列并重置if condition
- 从dataframe列创建set和count字典
- 如何在某些情况下在Spark数据帧中创建新的列'count'
- 使用count和for_each一起通过terraform创建AKS附加节点池
- 创建一个度量,根据不同于长表的count结果取平均值?
- 如何通过在terraform中给出count作为输入来动态创建虚拟列表
- 在r中创建Count函数
- 有条件地创建aws_security_group_rule, count在terraform中 &g
- terrafor from -使用count在2个子网中创建3个服务器
- 我在创建触发器以删除count>10的行时遇到了问题
- 在spring-boot java中创建count limit查询,以获取no of元素
- 如何使用for_each或count for azure为多个名称创建具有存储帐户的多个资源组
- 使用count和regex在地形中自动创建多个资源
- 如何从序列创建"count"矩阵?
- 在sql select中创建count in count
最新更新
- 清理脚本(而不是在提供的路径中搜索)总是命中系统根目录或运行它的根文件夹
- 无法加载要从中扩展的配置"airbnb" - gitlab ci
- 我如何将任何方程输入到Python中的标准形式?
- 在Swift中从实时数据库Firebase中获取数据
- Devstack单一接口不能在ubuntu 20.04上工作
- Typescript没有安装
- 函数式语言类型推断混乱
- 使用Python从大文件解析数字数据时提高速度
- 如何从Multipass共享文件夹到主机?
- 从CMD或批处理文件中禁用"Notify me when the clock changes"设置
- 在Dockerfile中设置——net=host ?
- 如何在React中将arrayBuffer转换为JSON
- 是否有可能将html响应转换为json在扑动?
- 快速过滤numpy数组值的方法
- C保存字符串的数组列表
- 需要minio film配置建议
- 如何应用CSS字体大小"relative to what it would be originally"?
- 该应用程序在个人帐户中未绑定脚本时被阻止错误
- 将原始查询转换为django orm
- 如何从出现次数和值的列表中创建一个新列表
- 使用Julia中的Julia Broadcasting根据数组的索引计算数组值
- 在javascript中使用条件更新嵌套数组
- for循环多个条件
- 将第一列中的名称行转换为r中的列
- Nx张量的映射切片
- Dotnet Core Azure功能(隔离进程)如何加载应用程序.每个环境的Json
- Discordjs不发送消息
- 边框在css中不显示
- Python pandas中的深度嵌套JSON规范化
- 如何在JSON模式中从正确的对象中选择特定的字段
热门标签:
javascript python java c# php android html jquery c++ css ios sql mysql arrays asp.net json python-3.x ruby-on-rails .net sql-server django objective-c excel regex ruby linux ajax iphone xml vba spring asp.net-mvc database wordpress string postgresql wpf windows xcode bash git oracle list vb.net multithreading eclipse algorithm macos powershell visual-studio image forms numpy scala function api selenium