使用 pyspark 用以前已知的良好值填充 null



有没有办法将 pyspark 数据帧中的null值替换为最后一个有效值?如果您认为需要它们进行窗口分区和排序,则还有其他timestampsession列。更具体地说,我想实现以下转换:

+---------+-----------+-----------+      +---------+-----------+-----------+
| session | timestamp |         id|      | session | timestamp |         id|
+---------+-----------+-----------+      +---------+-----------+-----------+
|        1|          1|       null|      |        1|          1|       null|
|        1|          2|        109|      |        1|          2|        109|
|        1|          3|       null|      |        1|          3|        109|
|        1|          4|       null|      |        1|          4|        109|
|        1|          5|        109| =>   |        1|          5|        109|
|        1|          6|       null|      |        1|          6|        109|
|        1|          7|        110|      |        1|          7|        110|
|        1|          8|       null|      |        1|          8|        110|
|        1|          9|       null|      |        1|          9|        110|
|        1|         10|       null|      |        1|         10|        110|
+---------+-----------+-----------+      +---------+-----------+-----------+

这使用 last 并忽略 null。

让我们重新创建类似于原始数据的内容:

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5},  {'session': 1, 'ts': 6}]
df = spark.createDataFrame(d)
df.show()
# +-------+---+----+
# |session| ts|  id|
# +-------+---+----+
# |      1|  1|null|
# |      1|  2| 109|
# |      1|  3|null|
# |      1|  4| 110|
# |      1|  5|null|
# |      1|  6|null|
# +-------+---+----+

现在,让我们使用窗口函数last

df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()
# +-------+---+----+
# |session| ts|  id|
# +-------+---+----+
# |      1|  1|null|
# |      1|  2| 109|
# |      1|  3| 109|
# |      1|  4| 110|
# |      1|  5| 110|
# |      1|  6| 110|
# +-------+---+----+

这似乎是使用窗口函数的技巧:

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
def fill_nulls(df):
    df_na = df.na.fill(-1)
    lag = df_na.withColumn('id_lag', func.lag('id', default=-1)
                           .over(Window.partitionBy('session')
                                 .orderBy('timestamp')))
    switch = lag.withColumn('id_change',
                            ((lag['id'] != lag['id_lag']) &
                             (lag['id'] != -1)).cast('integer'))

    switch_sess = switch.withColumn(
        'sub_session',
        func.sum("id_change")
        .over(
            Window.partitionBy("session")
            .orderBy("timestamp")
            .rowsBetween(-sys.maxsize, 0))
    )
    fid = switch_sess.withColumn('nn_id',
                           func.first('id')
                           .over(Window.partitionBy('session', 'sub_session')
                                 .orderBy('timestamp')))
    fid_na = fid.replace(-1, 'null')
    ff = fid_na.drop('id').drop('id_lag')
                          .drop('id_change')
                          .drop('sub_session').
                          withColumnRenamed('nn_id', 'id')
    return ff

这是完整的null_test.py。

@Oleksiy 的答案很棒,但不能完全满足我的要求。在一个会话中,如果观察到多个null,则所有都填充会话的第一个非null。我需要最后一个null值来向前传播。

以下调整适用于我的用例:

def fill_forward(df, id_column, key_column, fill_column):
    # Fill null's with last *non null* value in the window
    ff = df.withColumn(
        'fill_fwd',
        func.last(fill_column, True) # True: fill with last non-null
        .over(
            Window.partitionBy(id_column)
            .orderBy(key_column)
            .rowsBetween(-sys.maxsize, 0))
        )
    # Drop the old column and rename the new column
    ff_out = ff.drop(fill_column).withColumnRenamed('fill_fwd', fill_column)
    return ff_out

这是我遵循的技巧,将 pyspark 数据帧转换为熊猫数据帧并执行操作,因为 pandas 具有内置函数来填充空值以前已知的良好值。并将其改回 pyspark 数据帧。这是代码!!

d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5},  {'session': 1, 'ts': 6},{'session': 1, 'ts': 7, 'id': 110},{'session': 1, 'ts': 8},{'session': 1, 'ts': 9},{'session': 1, 'ts': 10}]
dt = spark.createDataFrame(d)

import pandas as pd
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
psdf= dt.select("*").toPandas()
psdf["id"].fillna(method='ffill', inplace=True)
dt= spark.createDataFrame(psdf)
dt.show()

相关内容

  • 没有找到相关文章

最新更新