在数据帧中添加新列,其中包含另一列值的相邻列数



我有一个这样的DataFrame

org.apache.spark.sql.DataFrame = [Timestamp: int, AccX: double ... 17 more fields]`

时间戳不成功,采用纪元格式。

我想添加一个新列,该列将具有接近当前行时间戳的时间戳数。

例:

时间戳

1
5
6
12
13
16

假设我们有 3 个范围。输出将是:

|      TimeStamp      |    New column    |
|          1          |         1        |
|          5          |         2        |
|          6          |         2        |
|          12         |         2        |
|          13         |         3        |
|          16         |         2        |

我正在考虑做这样的事情:

MyDF.map{x => MyDF.filter(MyDF("Timestamp").gt(x.getAs[Int]("Timestamp") - range).lt(x.getAs[Int]("Timestamp") + range) ).count()}

但这给我留下了一个:org.apache.spark.sql.Dataset[Long] = [value: bigint]

我不知道如何处理。

有没有人对如何处理这个问题有更好的想法?

谢谢

更新:我正在使用运行 Spark 版本 2.1.1 的齐柏林飞艇笔记本尝试 @Dennis Tsoi 提出的解决方案后,尝试对生成的数据帧执行操作(例如显示或收集)时出现错误。

以下是错误的全文:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2104)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withNewExecutionId(Dataset.scala:2788)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:638)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:597)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:606)
  ... 88 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@79df42d)
    - field (class: $iw, name: windowSpec, type: class org.apache.spark.sql.expressions.WindowSpec)
    - object (class $iw, $iw@20ade815)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@77cac38a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1ebfd642)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1ee19937)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@67b1d8f0)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@16ca3d83)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3129d731)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@142a2936)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@494facc5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@45e32c0a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@509c3eb6)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7bba53a2)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@20971db8)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@ba81c26)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@9375cbb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3226a593)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@201516a3)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1ac15b76)
    - field (class: $line20176553781522.$read, name: $iw, type: class $iw)
    - object (class $line20176553781522.$read, $line20176553781522.$read@21cc8115)
    - field (class: $iw, name: $line20176553781522$read, type: class $line20176553781522.$read)
    - object (class $iw, $iw@57677eee)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1d619339)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@63f875)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@2a8641fe)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@279b1062)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@2a06eb02)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6071a045)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@36b8b963)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@49987884)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6cdfa5ad)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3bea2150)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7d1c7dc)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@78f47403)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6327d388)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5d120092)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4da8dd9c)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@2afee9a4)
    - field (class: $line20176553782370.$read, name: $iw, type: class $iw)
    - object (class $line20176553782370.$read, $line20176553782370.$read@7112605e)
    - field (class: $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, name: $line20176553782370$read, type: class $line20176553782370.$read)
    - object (class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw@cc82e3c)
    - field (class: $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, name: $outer, type: class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw)
    - object (class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw, $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw@9ec8a4e)
    - field (class: $$$$7f619eaa173efe86d354fc4efb19aab8$$$$$anonfun$1, name: $outer, type: class $$$$24338a4fbcb24dc6d683541cf6403767$$$$iw)
    - object (class $$$$7f619eaa173efe86d354fc4efb19aab8$$$$$anonfun$1, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, int, true]))
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:10
0)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 121 more

已更新

未批处理的查找操作(如 dtlt)可能非常昂贵,因此我想出了以下解决方案。

val timestampsDF = 
    Seq(
        ( 1, "smth1" ),
        ( 5, "smth2" ),
        ( 6, "smth3" ),
        ( 12, "smth4" ),
        ( 13, "smth5" ),
        ( 16, "smth6" )
    )
    .toDF( "TimeStamp", "smth" )
val timestampsStatic = 
    timestampsDF
    .select("TimeStamp")
    .as[ ( Int ) ]
    .collect()
def countNeighbors = udf( ( currentTs: Int, timestamps: Seq[ Int ] ) => {
    timestamps.count( ( ts ) => Math.abs( currentTs - ts ) <= 3 )
} )
val alltimeDF = 
    timestampsDF
    .withColumn( 
        "All TimeStamps", 
        lit( timestampsStatic )
    )
val neighborsDF =
    alltimeDF
    .withColumn( 
        "New Column", 
        countNeighbors( alltimeDF( "TimeStamp" ), alltimeDF( "All TimeStamps" ) )
    )
    .drop( "All TimeStamps" )
neighborsDF.show()

结果

+---------+-----+----------+
|TimeStamp| smth|New Column|
+---------+-----+----------+
|        1|smth1|         1|
|        5|smth2|         2|
|        6|smth3|         2|
|       12|smth4|         2|
|       13|smth5|         3|
|       16|smth6|         2|
+---------+-----+----------+

内存消耗问题

由于节点上只能访问数据帧,因此您必须将所有时间戳从原始 DF 复制到另一列作为静态字段。这将导致内存消耗增加,但您无法访问 UDF 中的所有列值,只能访问相应行中的列值。无论如何,我认为这是真正的"火花之道"。

相关内容

  • 没有找到相关文章

最新更新