如何在Pyspark Dataframe中顺序迭代行



我有一个类似这样的Spark DataFrame:

+-------+------+-----+---------------+
|Account|nature|value|           time|
+-------+------+-----+---------------+
|      a|     1|   50|10:05:37:293084|
|      a|     1|   50|10:06:46:806510|
|      a|     0|   50|11:19:42:951479|
|      a|     1|   40|19:14:50:479055|
|      a|     0|   50|16:56:17:251624|
|      a|     1|   40|16:33:12:133861|
|      a|     1|   20|17:33:01:385710|
|      b|     0|   30|12:54:49:483725|
|      b|     0|   40|19:23:25:845489|
|      b|     1|   30|10:58:02:276576|
|      b|     1|   40|12:18:27:161290|
|      b|     0|   50|12:01:50:698592|
|      b|     0|   50|08:45:53:894441|
|      b|     0|   40|17:36:55:827330|
|      b|     1|   50|17:18:41:728486|
+-------+------+-----+---------------+

我想将一行的性质列与具有相同帐户的其他行进行比较,我应该向前看,并添加名为重复的新列。如果性质发生变化,则两行的新列都将为true,从1变为0或反之亦然。例如,上面的数据帧应该是这样的:

+-------+------+-----+---------------+--------+
|Account|nature|value|           time|Repeated|
+-------+------+-----+---------------+--------+
|      a|     1|   50|10:05:37:293084|   true |
|      a|     1|   50|10:06:46:806510|    true|
|      a|     0|   50|11:19:42:951479|   true |
|      a|     0|   50|16:56:17:251624|   true |
|      b|     0|   50|08:45:53:894441|   true |
|      b|     0|   50|12:01:50:698592|   false|
|      b|     1|   50|17:18:41:728486|   true |
|      a|     1|   40|16:33:12:133861|   false|
|      a|     1|   40|19:14:50:479055|   false|
|      b|     1|   40|12:18:27:161290|    true|
|      b|     0|   40|17:36:55:827330|   true |
|      b|     0|   40|19:23:25:845489|   false|
|      b|     1|   30|10:58:02:276576|    true|
|      b|     0|   30|12:54:49:483725|   true |
|      a|     1|   20|17:33:01:385710|   false|
+-------+------+-----+---------------+--------+              

我的解决方案是,我必须按或帐户列上的窗口进行分组;然后在每组中,将每行的性质与其他行的列变满。我用Spark Window函数做了这个计算。像这样:

windowSpec  = Window.partitionBy("Account","value").orderBy("time")
df.withColumn("Repeated", coalesce(f.when(lead(df['nature']).over(windowSpec)!=df['nature'],lit(True)).otherwise(False))).show()

结果是这样的,这不是我想要的结果:

+-------+------+-----+---------------+--------+
|Account|nature|value|           time|Repeated|
+-------+------+-----+---------------+--------+
|      a|     1|   50|10:05:37:293084|   false|
|      a|     1|   50|10:06:46:806510|    true|
|      a|     0|   50|11:19:42:951479|   false|
|      a|     0|   50|16:56:17:251624|   false|
|      b|     0|   50|08:45:53:894441|   false|
|      b|     0|   50|12:01:50:698592|    true|
|      b|     1|   50|17:18:41:728486|   false|
|      a|     1|   40|16:33:12:133861|   false|
|      a|     1|   40|19:14:50:479055|   false|
|      b|     1|   40|12:18:27:161290|    true|
|      b|     0|   40|17:36:55:827330|   false|
|      b|     0|   40|19:23:25:845489|   false|
|      b|     1|   30|10:58:02:276576|    true|
|      b|     0|   30|12:54:49:483725|   false|
|      a|     1|   20|17:33:01:385710|   false|
+-------+------+-----+---------------+--------+

UPDATE:为了解释更多,如果我们假设第一个Spark数据帧被命名为";df";,在下文中,我写下了每组";帐户";以及";值":

a = df.withColumn('repeated',lit(False))
for i in range(len(group)):
j = i+1
for j in j<=len(group):
if a.loc[i,'nature']!=a.loc[j,'nature'] and  a.loc[j,'repeated']==False:
a.loc[i,'repeated'] = True
a.loc[j,'repeated'] = True

你能指导我如何使用Pyspark Window吗?

非常感谢您的帮助。

您实际上需要保证您在数据帧中看到的顺序是实际的顺序。你能做到吗?您需要一个列来按顺序排列所发生的事情。在数据帧中插入新数据并不能保证其顺序。

一个窗口&滞后将允许您查看前一行的值并进行所需的调整
仅供参考:我在这里使用聚结,就好像它是第一行一样——没有可供比较的值。考虑使用第二个参数来合并您认为适合帐户中第一个值的情况。)

如果你需要的话,看看单调递增函数。它可以帮助您按值创建订单,这是我们决定性地查看这些数据所必需的。

from pyspark.sql.functions import lag 
from pyspark.sql.functions import lit 
from pyspark.sql.functions import coalesce
from pyspark.sql.window import Window
spark.sql("create table nature (Account string,nature int, value int, order int)"); 
spark.sql("insert into nature values ('a', 1, 50,1), ('a', 1, 40,2),('a',0,50,3),('b',0,30,4),('b',0,40,5),('b',1,30,6),('b',1,40,7)")
windowSpec  = Window.partitionBy("Account").orderBy("order")
nature = spark.table("nature");
nature.withColumn("Repeated", coalesce( lead(nature['nature']).over(windowSpec) != nature['nature'], lit(True)) ).show()
|Account|nature|value|order|Repeated|
+-------+------+-----+-----+--------+
|      b|     0|   30|    4|   false|
|      b|     0|   40|    5|    true|
|      b|     1|   30|    6|   false|
|      b|     1|   40|    7|    true|
|      a|     1|   50|    1|   false|
|      a|     1|   40|    2|    true|
|      a|     0|   50|    3|    true|
+-------+------+-----+-----+--------+

编辑:根据你的描述,我不清楚该向前看还是向后看。我已经更改了代码以向前看一行,因为这与您输出中的帐户"B"一致。然而,帐户"A"的逻辑似乎与示例输出中"B"的逻辑不同。(或者我不理解从"1"开始而不是从"0"开始的微妙含义。)如果你想向前看一行,请使用lead,如果你想向后看一行则使用lag

问题已解决。尽管这种方式成本很高,但没关系。

def check(part):
df = part
size = len(df)
for i in range(size):
if (df.loc[i,'repeated'] == True):
continue
else:
for j in range((i+1),size):
if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
df.loc[j,'repeated'] = True
df.loc[i,'repeated'] = True
break
return df
df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()

更新1:另一个没有任何迭代的解决方案。

def check(df):
df = df.sort_values('verified_time')
df['index'] = df.index
df['IS_REPEATED'] = 0
df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
df1['IS_REPEATED']=df1['nature']^df2['nature']
df3 = df1.sort_values(['index'],ascending=[True])
df = df3.drop(['index'],axis=1)
return df
df = df.groupby("account", "value").applyInPandas(gf.check2,schema=gf.get_schema('trx'))

更新2:带有Spark窗口的解决方案:

def is_repeated_feature(df):
windowPartition = Window.partitionBy("account", "value", 'nature').orderBy('nature')
df_1 = df.withColumn('rank', F.row_number().over(windowPartition))
w = (Window
.partitionBy('account', 'value')
.orderBy('nature')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df_1 = df_1.withColumn("count_nature", F.count('nature').over(w))
df_1 = df_1.withColumn('sum_nature', F.sum('nature').over(w))
df_1 = df_1.select('*')
df_2 = df_1.withColumn('min_val',
when((df_1.sum_nature > (df_1.count_nature - df_1.sum_nature)),
(df_1.count_nature - df_1.sum_nature)).otherwise(df_1.sum_nature))
df_2 = df_2.withColumn('more_than_one', when(df_2.count_nature > 1, '1').otherwise('0'))
df_2 = df_2.withColumn('is_repeated',
when(((df_2.more_than_one == 1) & (df_2.count_nature > df_2.sum_nature) & (
df_2.rank <= df_2.min_val)), '1')
.otherwise('0'))
return df_2

最新更新