i必须在pyspark中实现pandas .apply(函数,轴= 1)(应用行明智的函数)。由于我是新手,我不确定是否可以通过MAP功能或使用UDF来实现。我在任何地方都找不到任何类似的实现。
基本上我想要的是将行传递到函数做一些操作,以创建新列,这些新列取决于当前行和以前的行的值,然后返回修改后的行以创建一个新的数据框架。下面给出了与熊猫一起使用的功能之一:
previous = 1
def row_operation(row):
global previous
if pd.isnull(row["PREV_COL_A"])==True or (row["COL_A"]) != (row["PREV_COL_A"]):
current = 1
elif row["COL_C"] > cutoff:
current = previous +1
elif row["COL_C"]<=cutoff:
current = previous
else:
current = Nan
previous = current
return current
这里prev_col_a不过是1行滞后的col_a。
请注意,此功能是最简单的,并且不会返回行,但其他人则不会返回行。如果有人可以指导我如何在Pyspark中实施行操作,那将是一个很好的帮助。tia
您可以使用rdd.mappartition。它将为您提供一行的迭代器,并产生要返回的结果行。您获得的效果不允许您向前或向后索引,只需返回下一行即可。但是,您可以在处理时节省行以执行您需要做的任何事情。例如
def my_cool_function(rows):
prev_rows = []
for row in rows:
# Do some processing with all the rows, and return a result
yield my_new_row
if len(prev_rows) >= 2:
prev_rows = prev_rows[1:]
prev_rows.append(row)
updated_rdd = rdd.mapPartitions(my_cool_function)
注意,我使用列表来跟踪示例的分区,但是python列表确实是没有有效的头推/pop方法的数组,因此您可能需要使用实际的队列。