pyspark如何在一个窗口内聚合



我有一个具有以下模式的数据框架

  1. epoch- long(事件的历元舍入到最近的分钟)
  2. client_id- string (self - explitory)
  3. volume- long(一分钟内发生的事件数)

我想添加以下列(称为prev-1h-3m-interval-median) -由client_id分区,我想每分钟查看前60分钟,求和连续3分钟(0- 3,3 -6,…)57-60),并得到总数的中位数。

编辑-增加了单个client_id的示例

<表类> 时代 卷 tbody><<tr>1302773734575663777584495010651197128413181419157116461788181219242035

我已经找到了一种不需要任何udf的可能方法。(代码后说明)

sdf. 
withColumn('flag_odd', (func.col('epoch')%2 != 0).cast('int')). 
withColumn('flag_even', (func.col('epoch')%2 == 0).cast('int')). 
withColumn('sum_curr_and_prev', func.sum('volume').over(wd.orderBy('epoch').rowsBetween(-1, 0))). 
withColumn('even_collected', func.collect_list(func.when(func.col('flag_odd') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). 
withColumn('odd_collected', func.collect_list(func.when(func.col('flag_even') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). 
withColumn('even_collected', func.when((func.col('flag_even') == 1) & (func.col('epoch') > 6), func.col('even_collected'))). 
withColumn('odd_collected', func.when((func.col('flag_odd') == 1) & (func.col('epoch') > 6), func.col('odd_collected'))). 
withColumn('all_collections', func.coalesce('even_collected', 'odd_collected')). 
withColumn('median_val', func.sort_array('all_collections')[1]). 
show()
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |epoch|volume|flag_odd|flag_even|sum_curr_and_prev|even_collected|  odd_collected|all_collections|median_val|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |    1|    30|       1|        0|               30|          null|           null|           null|      null|
# |    2|    77|       0|        1|              107|          null|           null|           null|      null|
# |    3|    73|       1|        0|              150|          null|           null|           null|      null|
# |    4|    57|       0|        1|              130|          null|           null|           null|      null|
# |    5|     6|       1|        0|               63|          null|           null|           null|      null|
# |    6|    37|       0|        1|               43|          null|           null|           null|      null|
# |    7|    75|       1|        0|              112|          null| [107, 130, 43]| [107, 130, 43]|       107|
# |    8|    44|       0|        1|              119|[150, 63, 112]|           null| [150, 63, 112]|       112|
# |    9|    50|       1|        0|               94|          null| [130, 43, 119]| [130, 43, 119]|       119|
# |   10|    65|       0|        1|              115| [63, 112, 94]|           null|  [63, 112, 94]|        94|
# |   11|    97|       1|        0|              162|          null| [43, 119, 115]| [43, 119, 115]|       115|
# |   12|    84|       0|        1|              181|[112, 94, 162]|           null| [112, 94, 162]|       112|
# |   13|    18|       1|        0|              102|          null|[119, 115, 181]|[119, 115, 181]|       119|
# |   14|    19|       0|        1|               37|[94, 162, 102]|           null| [94, 162, 102]|       102|
# |   15|    71|       1|        0|               90|          null| [115, 181, 37]| [115, 181, 37]|       115|
# |   16|    46|       0|        1|              117|[162, 102, 90]|           null| [162, 102, 90]|       102|
# |   17|    88|       1|        0|              134|          null| [181, 37, 117]| [181, 37, 117]|       117|
# |   18|    12|       0|        1|              100|[102, 90, 134]|           null| [102, 90, 134]|       102|
# |   19|    24|       1|        0|               36|          null| [37, 117, 100]| [37, 117, 100]|       100|
# |   20|    35|       0|        1|               59| [90, 134, 36]|           null|  [90, 134, 36]|        90|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+

我已经使用了你提供的数据

data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
(11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]
sdf = spark.sparkContext.parallelize(data).toDF(['epoch', 'volume'])
# +-----+------+
# |epoch|volume|
# +-----+------+
# |    1|    30|
# |    2|    77|
# |    3|    73|
# |    4|    57|
# |    5|     6|
# +-----+------+
  • 从那里,我创建奇数和偶数记录标识符。他们会帮助我们创建一个系列。注意,我假设epoch是连续的和有序的。
  • 然后,我对音量值求和。epoch 2 volume和epoch 1 volume, epoch 3 volume和epoch 2 volume,等等。
  • 创建集合-我使用了sql函数collect_list(),其中包含when()条件。我想,对于每个奇数纪元,你收集前3个偶数纪元的成对和的中位数(步骤2),例如-对于纪元7,你需要纪元6的成对和,纪元4的成对和,纪元2的成对和,即[43, 130, 107]。类似的方法用于偶数时代。
  • 一旦有了集合,median将是集合的中间值。该集合是一个包含3个值的列表,可以对其进行排序,并且可以从列表中提取第二个值。例如,[43, 130, 107]可以排序到[43, 107, 130],107可以用[43, 107, 130][1]
  • 提取

最新更新