我有一个这样的DataFrame
:
org.apache.spark.sql.DataFrame = [Timestamp: int, AccX: double ... 17 more fields]`
时间戳不成功,采用纪元格式。
我想添加一个新列,该列将具有接近当前行时间戳的时间戳数。
例:
时间戳
1
5
6
12
13
16
假设我们有 3 个范围。输出将是:
| TimeStamp | New column |
| 1 | 1 |
| 5 | 2 |
| 6 | 2 |
| 12 | 2 |
| 13 | 3 |
| 16 | 2 |
我正在考虑做这样的事情:
MyDF.map{x => MyDF.filter(MyDF("Timestamp").gt(x.getAs[Int]("Timestamp") - range).lt(x.getAs[Int]("Timestamp") + range) ).count()}
但这给我留下了一个:org.apache.spark.sql.Dataset[Long] = [value: bigint]
我不知道如何处理。
有没有人对如何处理这个问题有更好的想法?
谢谢
更新:我正在使用运行 Spark 版本 2.1.1 的齐柏林飞艇笔记本尝试 @Dennis Tsoi 提出的解决方案后,尝试对生成的数据帧执行操作(例如显示或收集)时出现错误。
以下是错误的全文:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2104)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at org.apache.spark.sql.Dataset.show(Dataset.scala:638)
at org.apache.spark.sql.Dataset.show(Dataset.scala:597)
at org.apache.spark.sql.Dataset.show(Dataset.scala:606)
... 88 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
- object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@79df42d)
- field (class: $iw, name: windowSpec, type: class org.apache.spark.sql.expressions.WindowSpec)
- object (class $iw, $iw@20ade815)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@77cac38a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1ebfd642)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1ee19937)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@67b1d8f0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@16ca3d83)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3129d731)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@142a2936)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@494facc5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@45e32c0a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@509c3eb6)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7bba53a2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@20971db8)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@ba81c26)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@9375cbb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3226a593)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@201516a3)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1ac15b76)
- field (class: $line20176553781522.$read, name: $iw, type: class $iw)
- object (class $line20176553781522.$read, $line20176553781522.$read@21cc8115)
- field (class: $iw, name: $line20176553781522$read, type: class $line20176553781522.$read)
- object (class $iw, $iw@57677eee)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1d619339)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@63f875)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2a8641fe)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@279b1062)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2a06eb02)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6071a045)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@36b8b963)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@49987884)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6cdfa5ad)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3bea2150)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7d1c7dc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@78f47403)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6327d388)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5d120092)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4da8dd9c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2afee9a4)
- field (class: $line20176553782370.$read, name: $iw, type: class $iw)
- object (class $line20176553782370.$read, $line20176553782370.$read@7112605e)
- field (class: $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, name: $line20176553782370$read, type: class $line20176553782370.$read)
- object (class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw@cc82e3c)
- field (class: $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, name: $outer, type: class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw)
- object (class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw@9ec8a4e)
- field (class: $$$$7f619eaa173efe86d354fc4efb19aab8$$$$$anonfun$1, name: $outer, type: class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw)
- object (class $$$$7f619eaa173efe86d354fc4efb19aab8$$$$$anonfun$1, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, int, true]))
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:10
0)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 121 more
已更新
未批处理的查找操作(如 dt
和 lt
)可能非常昂贵,因此我想出了以下解决方案。
val timestampsDF =
Seq(
( 1, "smth1" ),
( 5, "smth2" ),
( 6, "smth3" ),
( 12, "smth4" ),
( 13, "smth5" ),
( 16, "smth6" )
)
.toDF( "TimeStamp", "smth" )
val timestampsStatic =
timestampsDF
.select("TimeStamp")
.as[ ( Int ) ]
.collect()
def countNeighbors = udf( ( currentTs: Int, timestamps: Seq[ Int ] ) => {
timestamps.count( ( ts ) => Math.abs( currentTs - ts ) <= 3 )
} )
val alltimeDF =
timestampsDF
.withColumn(
"All TimeStamps",
lit( timestampsStatic )
)
val neighborsDF =
alltimeDF
.withColumn(
"New Column",
countNeighbors( alltimeDF( "TimeStamp" ), alltimeDF( "All TimeStamps" ) )
)
.drop( "All TimeStamps" )
neighborsDF.show()
结果
+---------+-----+----------+
|TimeStamp| smth|New Column|
+---------+-----+----------+
| 1|smth1| 1|
| 5|smth2| 2|
| 6|smth3| 2|
| 12|smth4| 2|
| 13|smth5| 3|
| 16|smth6| 2|
+---------+-----+----------+
内存消耗问题
由于节点上只能访问数据帧行,因此您必须将所有时间戳从原始 DF 复制到另一列作为静态字段。这将导致内存消耗增加,但您无法访问 UDF 中的所有列值,只能访问相应行中的列值。无论如何,我认为这是真正的"火花之道"。