Pyspark:'For'循环向数据帧添加行



我正在尝试使用for loop将新行添加到数据框架中。因此输入是:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
Xyz     25    123      234        345
Abc     40    456      567        678

我想要的输出是:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    26    789      123       234
 Abc    40    456      567       678
 Abc    41    890      456       567

所以,我拥有的代码是:

df = df.withColumn("ColNum", (df.ColNum + 1).cast(IntegerType())) 
       .withColumn("ColB_lag2", df.ColB_lag1)
       .withColumn("ColB_lag1", df.ColB)
       .withColumn("ColB", someFunc())

当我只需添加一行时,代码正常工作,但是当我必须在循环中添加多个行时会断开。因此,我用一个用于循环来完成它。我要在循环开头的最新行进行过滤,然后运行上面的逻辑以计算列的值。然后将新行附加到数据集中,该数据集再次在循环的顶部使用。输出最终看起来像这样:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    25    789      123
 Xyz    26    789      123
 Abc    40    456      567       678
 Abc    40    890      456
 Abc    41    890      456
>

如果我错过了这里的任何关键点,请分享更多详细信息。

编辑1:for循环如下:

num_months = 5
df_final = sc.read.csv(input_path, header='true').createOrReplaceTempView("df_final")
for i in range(num_months):
    df = sc.sql("""
        SELECT *
        FROM df_final mrd
        INNER JOIN
            (SELECT ColA AS ColA_tmp, MAX(fh_effdt) AS max_fh_effdt
            FROM df_final
            GROUP BY ColA) grouped_mrd
        ON mrd.ColA = grouped_mrd.ColA_tmp
        AND mrd.fh_effdt = grouped_mrd.max_fh_effdt
        """)
    df = df.drop(df.ColA_tmp).drop(df.max_fh_effdt).drop(df.ColB_lag2)
    df_tmp = df.withColumn("ColNum", (df.wala + 1).cast(IntegerType())) 
               .withColumn("ColB_lag2", df.ColB_lag1) 
               .withColumn("ColB_lag1", df.ColB) 
               .withColumn("ColB", someFunc())
    df_final = df_final.union(df_tmp)
df_final.persist()
df_final.coalesce(1).write.csv(output_path + scenario_name+"_df_final", mode='overwrite', header='true')

解决方案:问题是工会。由于我要丢下列并重新计算它们,因此Spark将这些列添加到末端,而" Union"按列位置进行联合而不是名称。这就是由于数据被新行的几列移动而在随后的循环中创建问题的原因。解决方案是从字面上选择所有列并在进行联合之前重新排序。上面的摘要简化了我可以在不掉落COLB_LAG2的情况下进行的。实际的代码在我从另一个数据框架上刷新一些值的位置之间有了另一个步骤,在从新的DataFrame中引入之前,需要删除这些列。

您的问题是您正在对数据框架的版本(来自CSV数据源的原始数据(创建临时视图,并期望它反映对df_final数据框架的更改变量。

临时视图df_final不包含循环运行时数据框架df_final的数据。数据帧是不可变的。解决此问题的一种方法是替换循环中的临时视图:

# the top part of your loop...
df_final = df_final.union(df_tmp)
df_final.createOrReplaceTempView("df_final")

最新更新