我有一个类似这样的Spark DataFrame:
+-------+------+-----+---------------+
|Account|nature|value| time|
+-------+------+-----+---------------+
| a| 1| 50|10:05:37:293084|
| a| 1| 50|10:06:46:806510|
| a| 0| 50|11:19:42:951479|
| a| 1| 40|19:14:50:479055|
| a| 0| 50|16:56:17:251624|
| a| 1| 40|16:33:12:133861|
| a| 1| 20|17:33:01:385710|
| b| 0| 30|12:54:49:483725|
| b| 0| 40|19:23:25:845489|
| b| 1| 30|10:58:02:276576|
| b| 1| 40|12:18:27:161290|
| b| 0| 50|12:01:50:698592|
| b| 0| 50|08:45:53:894441|
| b| 0| 40|17:36:55:827330|
| b| 1| 50|17:18:41:728486|
+-------+------+-----+---------------+
我想将一行的性质列与具有相同帐户和值的其他行进行比较,我应该向前看,并添加名为重复的新列。如果性质发生变化,则两行的新列都将为true,从1变为0或反之亦然。例如,上面的数据帧应该是这样的:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| true |
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| true |
| a| 0| 50|16:56:17:251624| true |
| b| 0| 50|08:45:53:894441| true |
| b| 0| 50|12:01:50:698592| false|
| b| 1| 50|17:18:41:728486| true |
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| true |
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| true |
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
我的解决方案是,我必须按或帐户和值列上的窗口进行分组;然后在每组中,将每行的性质与其他行的复列变满。我用Spark Window函数做了这个计算。像这样:
windowSpec = Window.partitionBy("Account","value").orderBy("time")
df.withColumn("Repeated", coalesce(f.when(lead(df['nature']).over(windowSpec)!=df['nature'],lit(True)).otherwise(False))).show()
结果是这样的,这不是我想要的结果:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| false|
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| false|
| a| 0| 50|16:56:17:251624| false|
| b| 0| 50|08:45:53:894441| false|
| b| 0| 50|12:01:50:698592| true|
| b| 1| 50|17:18:41:728486| false|
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| false|
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| false|
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
UPDATE:为了解释更多,如果我们假设第一个Spark数据帧被命名为";df";,在下文中,我写下了每组";帐户";以及";值":
a = df.withColumn('repeated',lit(False))
for i in range(len(group)):
j = i+1
for j in j<=len(group):
if a.loc[i,'nature']!=a.loc[j,'nature'] and a.loc[j,'repeated']==False:
a.loc[i,'repeated'] = True
a.loc[j,'repeated'] = True
你能指导我如何使用Pyspark Window吗?
非常感谢您的帮助。
您实际上需要保证您在数据帧中看到的顺序是实际的顺序。你能做到吗?您需要一个列来按顺序排列所发生的事情。在数据帧中插入新数据并不能保证其顺序。
一个窗口&滞后将允许您查看前一行的值并进行所需的调整
仅供参考:我在这里使用聚结,就好像它是第一行一样——没有可供比较的值。考虑使用第二个参数来合并您认为适合帐户中第一个值的情况。)
如果你需要的话,看看单调递增函数。它可以帮助您按值创建订单,这是我们决定性地查看这些数据所必需的。
from pyspark.sql.functions import lag
from pyspark.sql.functions import lit
from pyspark.sql.functions import coalesce
from pyspark.sql.window import Window
spark.sql("create table nature (Account string,nature int, value int, order int)");
spark.sql("insert into nature values ('a', 1, 50,1), ('a', 1, 40,2),('a',0,50,3),('b',0,30,4),('b',0,40,5),('b',1,30,6),('b',1,40,7)")
windowSpec = Window.partitionBy("Account").orderBy("order")
nature = spark.table("nature");
nature.withColumn("Repeated", coalesce( lead(nature['nature']).over(windowSpec) != nature['nature'], lit(True)) ).show()
|Account|nature|value|order|Repeated|
+-------+------+-----+-----+--------+
| b| 0| 30| 4| false|
| b| 0| 40| 5| true|
| b| 1| 30| 6| false|
| b| 1| 40| 7| true|
| a| 1| 50| 1| false|
| a| 1| 40| 2| true|
| a| 0| 50| 3| true|
+-------+------+-----+-----+--------+
编辑:根据你的描述,我不清楚该向前看还是向后看。我已经更改了代码以向前看一行,因为这与您输出中的帐户"B"一致。然而,帐户"A"的逻辑似乎与示例输出中"B"的逻辑不同。(或者我不理解从"1"开始而不是从"0"开始的微妙含义。)如果你想向前看一行,请使用lead
,如果你想向后看一行则使用lag
。
问题已解决。尽管这种方式成本很高,但没关系。
def check(part):
df = part
size = len(df)
for i in range(size):
if (df.loc[i,'repeated'] == True):
continue
else:
for j in range((i+1),size):
if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
df.loc[j,'repeated'] = True
df.loc[i,'repeated'] = True
break
return df
df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()
更新1:另一个没有任何迭代的解决方案。
def check(df):
df = df.sort_values('verified_time')
df['index'] = df.index
df['IS_REPEATED'] = 0
df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
df1['IS_REPEATED']=df1['nature']^df2['nature']
df3 = df1.sort_values(['index'],ascending=[True])
df = df3.drop(['index'],axis=1)
return df
df = df.groupby("account", "value").applyInPandas(gf.check2,schema=gf.get_schema('trx'))
更新2:带有Spark窗口的解决方案:
def is_repeated_feature(df):
windowPartition = Window.partitionBy("account", "value", 'nature').orderBy('nature')
df_1 = df.withColumn('rank', F.row_number().over(windowPartition))
w = (Window
.partitionBy('account', 'value')
.orderBy('nature')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df_1 = df_1.withColumn("count_nature", F.count('nature').over(w))
df_1 = df_1.withColumn('sum_nature', F.sum('nature').over(w))
df_1 = df_1.select('*')
df_2 = df_1.withColumn('min_val',
when((df_1.sum_nature > (df_1.count_nature - df_1.sum_nature)),
(df_1.count_nature - df_1.sum_nature)).otherwise(df_1.sum_nature))
df_2 = df_2.withColumn('more_than_one', when(df_2.count_nature > 1, '1').otherwise('0'))
df_2 = df_2.withColumn('is_repeated',
when(((df_2.more_than_one == 1) & (df_2.count_nature > df_2.sum_nature) & (
df_2.rank <= df_2.min_val)), '1')
.otherwise('0'))
return df_2