根据条件,在多个列上分组pyspark中的累积总和函数



我需要在多个列上创建一个event_id(v_id,d_id,ip,l_id),并在delta> 40时将其递增以获取像这样的输出

v_id d_id ip l_id delta event_id last_event_flag1 20 30 40 1 1 N1 20 30 40 2 1 N1 20 30 40 3 1 N1 20 30 40 4 1 y1 20 20 40 1 1 y1 30 30 40 2 1 N1 30 30 40 3 1 N1 30 30 40 4 1 N1 30 30 40 5 1 y

我能够使用PANDAS数据框架实现此目标

df['event_id'] = (df.delta >=40.0).groupby([df.l_id,df.v_id,d_id,ip]).cumsum() + 1
df.append(df['event_id'], ignore_index=True

但是,在较大数据上执行内存错误。

如何在pyspark中做类似的事情。

在Pyspark中您可以使用window函数进行操作:

首先,让我们创建数据框。请注意,您也可以将其直接从CSV中直接加载为数据框:

df = spark.createDataFrame(
    sc.parallelize(
        [[1,20,30,40,1,1],
        [1,20,30,40,2,1],
        [1,20,30,40,3,1],
        [1,20,30,40,4,1],
        [1,20,30,40,45,2],
        [1,20,30,40,1,2],
        [1,30,30,40,2,1],
        [1,30,30,40,3,1],
        [1,30,30,40,4,1],
        [1,30,30,40,5,1]]
    ), 
    ["v_id","d_id","ip","l_id","delta","event_id"]
)

您的桌子上有一个隐式的顺序,我们需要创建一个单调增加的ID,以便我们最终不会绕过它:

import pyspark.sql.functions as psf
df = df.withColumn(
    "rn", 
    psf.monotonically_increasing_id()
)
    +----+----+---+----+-----+--------+----------+
    |v_id|d_id| ip|l_id|delta|event_id|        rn|
    +----+----+---+----+-----+--------+----------+
    |   1|  20| 30|  40|    1|       1|         0|
    |   1|  20| 30|  40|    2|       1|         1|
    |   1|  20| 30|  40|    3|       1|         2|
    |   1|  20| 30|  40|    4|       1|         3|
    |   1|  20| 30|  40|   45|       2|         4|
    |   1|  20| 30|  40|    1|       2|8589934592|
    |   1|  30| 30|  40|    2|       1|8589934593|
    |   1|  30| 30|  40|    3|       1|8589934594|
    |   1|  30| 30|  40|    4|       1|8589934595|
    |   1|  30| 30|  40|    5|       1|8589934596|
    +----+----+---+----+-----+--------+----------+

现在要计算event_idlast_event_flag

from pyspark.sql import Window
w1 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy("rn")
w2 = Window.partitionBy("v_id", "d_id", "l_id", "ip").orderBy(psf.desc("rn"))
df.withColumn(
    "event_id", 
    psf.sum((df.delta >= 40).cast("int")).over(w1) + 1
).withColumn(
    "last_event_flag", 
    psf.row_number().over(w2) == 1
).drop("rn")
    +----+----+---+----+-----+--------+---------------+
    |v_id|d_id| ip|l_id|delta|event_id|last_event_flag|
    +----+----+---+----+-----+--------+---------------+
    |   1|  20| 30|  40|    1|       1|          false|
    |   1|  20| 30|  40|    2|       1|          false|
    |   1|  20| 30|  40|    3|       1|          false|
    |   1|  20| 30|  40|    4|       1|          false|
    |   1|  20| 30|  40|   45|       2|          false|
    |   1|  20| 30|  40|    1|       2|           true|
    |   1|  30| 30|  40|    2|       1|          false|
    |   1|  30| 30|  40|    3|       1|          false|
    |   1|  30| 30|  40|    4|       1|          false|
    |   1|  30| 30|  40|    5|       1|           true|
    +----+----+---+----+-----+--------+---------------+

也许您应该在运行groupby之前计算df = df = df [df.delta> = 40] - 我不确定是否重要。

您还可以研究基于CSV的块以进行内存效率来执行计算。因此,您可以将数据分解为10000行的块,然后运行计算以避免记忆错误。

https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html

如何使用pandas读取6 GB CSV文件

相关内容

  • 没有找到相关文章