在PySpark中的数据框架上,行明智的操作或行动



i必须在pyspark中实现pandas .apply(函数,轴= 1)(应用行明智的函数)。由于我是新手,我不确定是否可以通过MAP功能或使用UDF来实现。我在任何地方都找不到任何类似的实现。

基本上我想要的是将行传递到函数做一些操作,以创建新列,这些新列取决于当前行和以前的行的值,然后返回修改后的行以创建一个新的数据框架。下面给出了与熊猫一起使用的功能之一:

previous = 1
def row_operation(row):
    global previous
    if pd.isnull(row["PREV_COL_A"])==True or (row["COL_A"]) != (row["PREV_COL_A"]):
        current = 1
    elif row["COL_C"] > cutoff:
        current = previous +1
    elif row["COL_C"]<=cutoff:
        current = previous
    else:
        current = Nan
    previous = current
    return current

这里prev_col_a不过是1行滞后的col_a。

请注意,此功能是最简单的,并且不会返回行,但其他人则不会返回行。如果有人可以指导我如何在Pyspark中实施行操作,那将是一个很好的帮助。tia

您可以使用rdd.mappartition。它将为您提供一行的迭代器,并产生要返回的结果行。您获得的效果不允许您向前或向后索引,只需返回下一行即可。但是,您可以在处理时节省行以执行您需要做的任何事情。例如

def my_cool_function(rows):
    prev_rows = []
    for row in rows:
       # Do some processing with all the rows, and return a result
       yield my_new_row
       if len(prev_rows) >= 2:
           prev_rows = prev_rows[1:]
       prev_rows.append(row)
updated_rdd = rdd.mapPartitions(my_cool_function)

注意,我使用列表来跟踪示例的分区,但是python列表确实是没有有效的头推/pop方法的数组,因此您可能需要使用实际的队列。

相关内容

  • 没有找到相关文章

最新更新