PySpark UDF函数与数据帧查询



我有另一个解决方案,但我更喜欢使用PySpark 2.3来完成。

我有一个像这样的二维PySpark数据帧:

Date       | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12

我想通过查找过去最接近的值来替换IDnull值,或者如果该值为null,则通过向前查找(如果再次为null,设置默认值)

我设想添加一个带有.withColumn的新列,并使用一个UDF函数来查询数据帧本身。

类似于伪代码中的东西(不完美,但这是主要思想):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def return_value(value,date):
if value is not null:
return val
value1 = df.filter(df['date']<= date).select(df['value']).collect()
if (value1)[0][0] is not null:
return (value1)[0][0]
value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
return (value2)[0][0]

value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))

但它不起作用。我是不是完全走错了路?是否只能在UDF函数中查询Spark数据帧?我错过了一个更简单的解决方案吗?

创建具有一列唯一所有日期列表的新数据帧:

datesDF = yourDF.select('Date').distinct()

创建另一个包含日期和ID,但仅包含不存在null的日期和ID。而且,让每个日期只保留ID的第一次(无论是第一次)出现(从你的例子来看,每个日期可以有多行)

noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')

现在让我们加入这两个,这样我们就有了所有日期的列表,无论我们有什么值(或空)

joinedDF = datesDF.join(noNullsDF, 'Date', 'left')

现在,对于每个日期,使用窗口函数从上一个日期和下一个日期获取ID的值,还可以重命名我们的ID列,这样以后连接的问题就会减少:

from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')
joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w)) 
.withColumn('nextID',f.lead('ID').over(w))
.withColumnRenamed('ID','newID') 

现在让我们按日期将其加入到我们的原始数据帧

yourDF = yourDF.join(joinedDF, 'Date', 'left')

现在我们的Dataframe有4个ID列:

  1. 原始ID
  2. newID-给定日期的任何非null值的ID(如果有的话)或null
  3. previousID-上一日期的ID(如果有或为空,则为非空)
  4. nextID-下一个日期的ID(如果有或为空,则为非空)

现在我们需要将它们按顺序组合到finalID中:

  1. 原始值(如果不是null)
  2. 如果结果不为null,则当前日期的值(这与您的问题相反,但Panda代码建议您进行日期检查)
  3. 前一日期的值(如果不为空)
  4. 下一个日期的值(如果不为空)
  5. 一些默认值

我们只需通过聚结:

default = 0
finalDF = yourDF.select('Date', 
'ID',
f.coalesce('ID',
'newID',
'previousID',
'nextID',
f.lit(default)).alias('finalID')
)

相关内容

  • 没有找到相关文章

最新更新