如何应用窗口规格,但在2个不同的列(pyspark)上



我正面临一个问题,在我的API中,解决方案并不明显,我想知道最有效的方法是什么。

用例:

我收集了两种类型的推文,并将它们存储在数据框架中(我们将其称为 type1 type2 )。对于这个问题,重要的是 timestamp 发布。对于 type1 的每条推文,我需要获得 type2 的所有推文,这些推文落在某个 /- time-window a type1的时间戳周围,并计算一个取决于每个 type2 Tweet的时间范围内的时间段的度量。

根据我尝试的内容,在时间戳列上的一个简单 WindowsPec 无法在这里工作。如果这样做,我会为每个时间戳获得一个WindowsPec,但是我只需要一定的时间戳记(type1)。我试图为每种类型的推文创建两个时间戳列,然后按Type1时间戳进行排序。但是" rangesbetweet"选项似乎仅在我排序的列上使用(我需要对Type1-Timestamp列进行排序,但是Typebet2weet2中-timestamp列)。

我想出的一个解决方案是创建2个数据范围,每种类型一个。然后,对于 type1 推文,i 收集进入列表中的所有时间戳,对于每个时间戳此列表中的每个时间戳并使用它。这有效,但是在我看来,解决方案似乎效率低下,对于足够大的数据框架,无论如何,收集过程可能会失败。

希望我对问题描述很清楚:)

这些可能是重叠的窗口,因此partitionBy不可能,并且帧不能是"动态"(它们不能基于当前行的时间戳值)。

在我看来,您应该使用Kafka并在批处理进行操作。但是,由于您已经拥有数据框架,让我们无论如何尝试。

收集值不是正确的解决方案(它带来了驾驶员内存中的所有内容)。另一种可能性是进行笛卡尔连接,这是非常昂贵的,您最终会在最终操作中过滤很多线。我们可以通过创建自己的动态批处理来解决问题。

首先,让我们创建两个类型推文1和2的示例数据框:

import datetime as dt
import numpy as np
np.random.seed(0)
time_deltas1 = np.random.randint(0, 10, 5).cumsum()
time_deltas2 = np.random.randint(0, 10, 20).cumsum()
df1 = spark.createDataFrame(
    sc.parallelize(zip(
        [dt.datetime(2017,1,1,0,0,0) + dt.timedelta(minutes=4*int(x)) for x in time_deltas1], 
        [chr(c) for c in range(ord('a'), ord('f'))]
    )), 
    ["ts1", "text1"]
)
df2 = spark.createDataFrame(
    sc.parallelize(zip(
        [dt.datetime(2017,1,1,0,0,0) + dt.timedelta(minutes=int(x)) + dt.timedelta(seconds = 30) for x in time_deltas2],
        [chr(c) for c in range(ord('f'), ord('z'))]
    )), 
    ["ts2", "text2"]
)
    +-------------------+-----+
    |                ts1|text1|
    +-------------------+-----+
    |2017-01-01 00:20:00|    a|
    |2017-01-01 00:20:00|    b|
    |2017-01-01 00:32:00|    c|
    |2017-01-01 00:44:00|    d|
    |2017-01-01 01:12:00|    e|
    +-------------------+-----+
    +-------------------+-----+
    |                ts2|text2|
    +-------------------+-----+
    |2017-01-01 00:09:30|    f|
    |2017-01-01 00:12:30|    g|
    |2017-01-01 00:17:30|    h|
    |2017-01-01 00:19:30|    i|
    |2017-01-01 00:23:30|    j|
    |2017-01-01 00:30:30|    k|
    |2017-01-01 00:36:30|    l|
    |2017-01-01 00:44:30|    m|
    |2017-01-01 00:52:30|    n|
    |2017-01-01 00:53:30|    o|
    |2017-01-01 00:59:30|    p|
    |2017-01-01 01:06:30|    q|
    |2017-01-01 01:13:30|    r|
    |2017-01-01 01:21:30|    s|
    |2017-01-01 01:22:30|    t|
    |2017-01-01 01:27:30|    u|
    |2017-01-01 01:36:30|    v|
    |2017-01-01 01:44:30|    w|
    |2017-01-01 01:53:30|    x|
    |2017-01-01 01:57:30|    y|
    +-------------------+-----+

假设我们的窗口应为/- n分钟,因此我们想要一个2*n分钟窗口。

首先,我们将timestamps施加为int,然后将它们圆将其围成两个最近的2*n分钟时间戳:

import pyspark.sql.functions as psf
n = 5*60   # for +/- 5 minutes  
df1 = df1.withColumn("ts1", psf.unix_timestamp(df1.ts1)).withColumn(
    "time_range", 
    psf.explode(psf.array((psf.col("ts1")/(2*n)).cast("int") * (2*n), ((1 + psf.col("ts1")/(2*n)).cast("int")) * (2*n)))
)
df2 = df2.withColumn("ts2", psf.unix_timestamp(df2.ts2)).withColumn(
    "time_range", 
    psf.explode(psf.array((psf.col("ts2")/(2*n)).cast("int") * (2*n), ((1 + psf.col("ts2")/(2*n)).cast("int")) * (2*n)))
)

将UNIX时间戳转换为时间戳以可视化:

    +-------------------+-----+-------------------+
    |                ts1|text1|         time_range|
    +-------------------+-----+-------------------+
    |2017-01-01 00:20:00|    a|2017-01-01 00:20:00|
    |2017-01-01 00:20:00|    a|2017-01-01 00:30:00|
    |2017-01-01 00:20:00|    b|2017-01-01 00:20:00|
    |2017-01-01 00:20:00|    b|2017-01-01 00:30:00|
    |2017-01-01 00:32:00|    c|2017-01-01 00:30:00|
    |2017-01-01 00:32:00|    c|2017-01-01 00:40:00|
    |2017-01-01 00:44:00|    d|2017-01-01 00:40:00|
    |2017-01-01 00:44:00|    d|2017-01-01 00:50:00|
    |2017-01-01 01:12:00|    e|2017-01-01 01:10:00|
    |2017-01-01 01:12:00|    e|2017-01-01 01:20:00|
    +-------------------+-----+-------------------+
    +-------------------+-----+-------------------+
    |                ts2|text2|         time_range|
    +-------------------+-----+-------------------+
    |2017-01-01 00:09:30|    f|2017-01-01 00:00:00|
    |2017-01-01 00:09:30|    f|2017-01-01 00:10:00|
    |2017-01-01 00:12:30|    g|2017-01-01 00:10:00|
    |2017-01-01 00:12:30|    g|2017-01-01 00:20:00|
    |2017-01-01 00:17:30|    h|2017-01-01 00:10:00|
    |2017-01-01 00:17:30|    h|2017-01-01 00:20:00|
    |2017-01-01 00:19:30|    i|2017-01-01 00:10:00|
    |2017-01-01 00:19:30|    i|2017-01-01 00:20:00|
    |2017-01-01 00:23:30|    j|2017-01-01 00:20:00|
    |2017-01-01 00:23:30|    j|2017-01-01 00:30:00|
    |2017-01-01 00:30:30|    k|2017-01-01 00:30:00|
    |2017-01-01 00:30:30|    k|2017-01-01 00:40:00|
    |2017-01-01 00:36:30|    l|2017-01-01 00:30:00|
    |2017-01-01 00:36:30|    l|2017-01-01 00:40:00|
    |2017-01-01 00:44:30|    m|2017-01-01 00:40:00|
    |2017-01-01 00:44:30|    m|2017-01-01 00:50:00|
    |2017-01-01 00:52:30|    n|2017-01-01 00:50:00|
    |2017-01-01 00:52:30|    n|2017-01-01 01:00:00|
    |2017-01-01 00:53:30|    o|2017-01-01 00:50:00|
    |2017-01-01 00:53:30|    o|2017-01-01 01:00:00|
    |2017-01-01 00:59:30|    p|2017-01-01 00:50:00|
    |2017-01-01 00:59:30|    p|2017-01-01 01:00:00|
    |2017-01-01 01:06:30|    q|2017-01-01 01:00:00|
    |2017-01-01 01:06:30|    q|2017-01-01 01:10:00|
    |2017-01-01 01:13:30|    r|2017-01-01 01:10:00|
    |2017-01-01 01:13:30|    r|2017-01-01 01:20:00|
    |2017-01-01 01:21:30|    s|2017-01-01 01:20:00|
    |2017-01-01 01:21:30|    s|2017-01-01 01:30:00|
    |2017-01-01 01:22:30|    t|2017-01-01 01:20:00|
    |2017-01-01 01:22:30|    t|2017-01-01 01:30:00|
    |2017-01-01 01:27:30|    u|2017-01-01 01:20:00|
    |2017-01-01 01:27:30|    u|2017-01-01 01:30:00|
    |2017-01-01 01:36:30|    v|2017-01-01 01:30:00|
    |2017-01-01 01:36:30|    v|2017-01-01 01:40:00|
    |2017-01-01 01:44:30|    w|2017-01-01 01:40:00|
    |2017-01-01 01:44:30|    w|2017-01-01 01:50:00|
    |2017-01-01 01:53:30|    x|2017-01-01 01:50:00|
    |2017-01-01 01:53:30|    x|2017-01-01 02:00:00|
    |2017-01-01 01:57:30|    y|2017-01-01 01:50:00|
    |2017-01-01 01:57:30|    y|2017-01-01 02:00:00|
    +-------------------+-----+-------------------+

我们现在可以加入2个数据范围,最后一个表将比笛卡尔连接表小,并在|ts1 - ts2| <= n min上过滤:

df = df1.join(df2, "time_range").filter(
    (psf.abs(df1.ts1 - df2.ts2) <= n) | (psf.isnull(df2.ts2))
).withColumn("ts1", psf.from_unixtime("ts1")).withColumn("ts2", psf.from_unixtime("ts2"))

将时间戳转换回时间戳格式后,我们可以收集2型的不同推文:

df = df.groupBy("ts1", "text1").agg(
    psf.collect_set(psf.struct("ts2", "text2")).alias("tweet2")
)
    +-------------------+-----+---------------------------------------------------------------------------+
    |ts1                |text1|tweet2                                                                     |
    +-------------------+-----+---------------------------------------------------------------------------+
    |2017-01-01 00:20:00|a    |[[2017-01-01 00:17:30,h], [2017-01-01 00:23:30,j], [2017-01-01 00:19:30,i]]|
    |2017-01-01 00:20:00|b    |[[2017-01-01 00:17:30,h], [2017-01-01 00:23:30,j], [2017-01-01 00:19:30,i]]|
    |2017-01-01 00:32:00|c    |[[2017-01-01 00:30:30,k], [2017-01-01 00:36:30,l]]                         |
    |2017-01-01 00:44:00|d    |[[2017-01-01 00:44:30,m]]                                                  |
    |2017-01-01 01:12:00|e    |[[2017-01-01 01:13:30,r]]                                                  |
    +-------------------+-----+---------------------------------------------------------------------------+

相关内容

  • 没有找到相关文章

最新更新