数据帧将每一行空值替换为唯一的纪元时间



我在数据帧中有 3 行,在 2 行中,列id具有空值。我需要遍历该特定列 id 上的每一行,并替换为纪元时间,该纪元时间应该是唯一的,并且应该发生在数据帧本身中。怎么能做到呢? 例如:

id | name
1    a
null b
null c

我想要这个将 null 转换为纪元时间的数据帧。

id     |     name
1             a
1435232       b
1542344       c
df
.select(
when($"id").isNull, /*epoch time*/).otherwise($"id").alias("id"),
$"name"
)

编辑

您需要确保UDF足够精确 - 如果它只有毫秒分辨率,您将看到重复的值。请参阅下面的示例,该示例清楚地说明了我的方法有效:

scala> def rand(s: String): Double = Math.random
rand: (s: String)Double
scala> val udfF = udf(rand(_: String))
udfF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))
scala> res11.select(when($"id".isNull, udfF($"id")).otherwise($"id").alias("id"), $"name").collect
res21: Array[org.apache.spark.sql.Row] = Array([0.6668195187088702,a], [0.920625293516218,b])

看看这个

scala>  val s1:Seq[(Option[Int],String)] = Seq( (Some(1),"a"), (null,"b"), (null,"c"))
s1: Seq[(Option[Int], String)] = List((Some(1),a), (null,b), (null,c))
scala> val df = s1.toDF("id","name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> val epoch = java.time.Instant.now.getEpochSecond
epoch: Long = 1539084285
scala> df.withColumn("id",when( $"id".isNull,epoch).otherwise($"id")).show
+----------+----+
|        id|name|
+----------+----+
|         1|   a|
|1539084285|   b|
|1539084285|   c|
+----------+----+

scala>

编辑1:

我使用了毫秒,然后我也得到了相同的值。Spark 不会在时间部分捕获纳秒。许多行可能获得相同的毫秒数。因此,您基于纪元获取唯一值的假设是行不通的。

scala> def getEpoch(x:String):Long = java.time.Instant.now.toEpochMilli
getEpoch: (x: String)Long
scala> val myudfepoch = udf( getEpoch(_:String):Long )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))
scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+-------------+----+
|           id|name|
+-------------+----+
|            1|   a|
|1539087300957|   b|
|1539087300957|   c|
+-------------+----+

scala>

唯一的可能性是使用单调的 RisingId,但该值可能并不总是具有相同的长度。

scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)+monotonicallyIncreasingId).otherwise($"id")).show
warning: there was one deprecation warning; re-run with -deprecation for details
+-------------+----+
|           id|name|
+-------------+----+
|            1|   a|
|1539090186541|   b|
|1539090186543|   c|
+-------------+----+

scala>

编辑2:

我能够欺骗System.nanoTime并获得递增的id,但它们不会是连续的,但长度可以保持。见下文

scala> def getEpoch(x:String):String = System.nanoTime.toString.take(12)
getEpoch: (x: String)String
scala>  val myudfepoch = udf( getEpoch(_:String):String )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+------------+----+
|          id|name|
+------------+----+
|           1|   a|
|186127230392|   b|
|186127230399|   c|
+------------+----+

scala>

在集群中运行时尝试此操作,如果得到重复值,请调整 take(12(。

最新更新