我需要在多个列上创建一个event_id(v_id,d_id,ip,l_id),并在delta> 40时将其递增以获取像这样的输出
v_id d_id ip l_id delta event_id last_event_flag1 20 30 40 1 1 N1 20 30 40 2 1 N1 20 30 40 3 1 N1 20 30 40 4 1 y1 20 20 40 1 1 y1 30 30 40 2 1 N1 30 30 40 3 1 N1 30 30 40 4 1 N1 30 30 40 5 1 y
我能够使用PANDAS数据框架实现此目标
df['event_id'] = (df.delta >=40.0).groupby([df.l_id,df.v_id,d_id,ip]).cumsum() + 1
df.append(df['event_id'], ignore_index=True
但是,在较大数据上执行内存错误。
如何在pyspark中做类似的事情。
在Pyspark中您可以使用window
函数进行操作:
首先,让我们创建数据框。请注意,您也可以将其直接从CSV中直接加载为数据框:
df = spark.createDataFrame(
sc.parallelize(
[[1,20,30,40,1,1],
[1,20,30,40,2,1],
[1,20,30,40,3,1],
[1,20,30,40,4,1],
[1,20,30,40,45,2],
[1,20,30,40,1,2],
[1,30,30,40,2,1],
[1,30,30,40,3,1],
[1,30,30,40,4,1],
[1,30,30,40,5,1]]
),
["v_id","d_id","ip","l_id","delta","event_id"]
)
您的桌子上有一个隐式的顺序,我们需要创建一个单调增加的ID,以便我们最终不会绕过它:
import pyspark.sql.functions as psf
df = df.withColumn(
"rn",
psf.monotonically_increasing_id()
)
+----+----+---+----+-----+--------+----------+
|v_id|d_id| ip|l_id|delta|event_id| rn|
+----+----+---+----+-----+--------+----------+
| 1| 20| 30| 40| 1| 1| 0|
| 1| 20| 30| 40| 2| 1| 1|
| 1| 20| 30| 40| 3| 1| 2|
| 1| 20| 30| 40| 4| 1| 3|
| 1| 20| 30| 40| 45| 2| 4|
| 1| 20| 30| 40| 1| 2|8589934592|
| 1| 30| 30| 40| 2| 1|8589934593|
| 1| 30| 30| 40| 3| 1|8589934594|
| 1| 30| 30| 40| 4| 1|8589934595|
| 1| 30| 30| 40| 5| 1|8589934596|
+----+----+---+----+-----+--------+----------+
现在要计算event_id
和last_event_flag
:
from pyspark.sql import Window
w1 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy("rn")
w2 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy(psf.desc("rn"))
df.withColumn(
"event_id",
psf.sum((df.delta >= 40).cast("int")).over(w1) + 1
).withColumn(
"last_event_flag",
psf.row_number().over(w2) == 1
).drop("rn")
+----+----+---+----+-----+--------+---------------+
|v_id|d_id| ip|l_id|delta|event_id|last_event_flag|
+----+----+---+----+-----+--------+---------------+
| 1| 20| 30| 40| 1| 1| false|
| 1| 20| 30| 40| 2| 1| false|
| 1| 20| 30| 40| 3| 1| false|
| 1| 20| 30| 40| 4| 1| false|
| 1| 20| 30| 40| 45| 2| false|
| 1| 20| 30| 40| 1| 2| true|
| 1| 30| 30| 40| 2| 1| false|
| 1| 30| 30| 40| 3| 1| false|
| 1| 30| 30| 40| 4| 1| false|
| 1| 30| 30| 40| 5| 1| true|
+----+----+---+----+-----+--------+---------------+
也许您应该在运行groupby之前计算df = df = df [df.delta> = 40] - 我不确定是否重要。
您还可以研究基于CSV的块以进行内存效率来执行计算。因此,您可以将数据分解为10000行的块,然后运行计算以避免记忆错误。
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
如何使用pandas读取6 GB CSV文件