我正面临一个问题,在我的API中,解决方案并不明显,我想知道最有效的方法是什么。
用例:
我收集了两种类型的推文,并将它们存储在数据框架中(我们将其称为 type1 , type2 )。对于这个问题,重要的是 timestamp 发布。对于 type1 的每条推文,我需要获得 type2 的所有推文,这些推文落在某个 /- time-window a type1的时间戳周围,并计算一个取决于每个 type2 Tweet的时间范围内的时间段的度量。
根据我尝试的内容,在时间戳列上的一个简单 WindowsPec 无法在这里工作。如果这样做,我会为每个时间戳获得一个WindowsPec,但是我只需要一定的时间戳记(type1)。我试图为每种类型的推文创建两个时间戳列,然后按Type1时间戳进行排序。但是" rangesbetweet"选项似乎仅在我排序的列上使用(我需要对Type1-Timestamp列进行排序,但是Typebet2weet2中-timestamp列)。
我想出的一个解决方案是创建2个数据范围,每种类型一个。然后,对于 type1 推文,i 收集进入列表中的所有时间戳,对于每个时间戳此列表中的每个时间戳并使用它。这有效,但是在我看来,解决方案似乎效率低下,对于足够大的数据框架,无论如何,收集过程可能会失败。
希望我对问题描述很清楚:)
这些可能是重叠的窗口,因此partitionBy
不可能,并且帧不能是"动态"(它们不能基于当前行的时间戳值)。
在我看来,您应该使用Kafka
并在批处理进行操作。但是,由于您已经拥有数据框架,让我们无论如何尝试。
收集值不是正确的解决方案(它带来了驾驶员内存中的所有内容)。另一种可能性是进行笛卡尔连接,这是非常昂贵的,您最终会在最终操作中过滤很多线。我们可以通过创建自己的动态批处理来解决问题。
首先,让我们创建两个类型推文1和2的示例数据框:
import datetime as dt
import numpy as np
np.random.seed(0)
time_deltas1 = np.random.randint(0, 10, 5).cumsum()
time_deltas2 = np.random.randint(0, 10, 20).cumsum()
df1 = spark.createDataFrame(
sc.parallelize(zip(
[dt.datetime(2017,1,1,0,0,0) + dt.timedelta(minutes=4*int(x)) for x in time_deltas1],
[chr(c) for c in range(ord('a'), ord('f'))]
)),
["ts1", "text1"]
)
df2 = spark.createDataFrame(
sc.parallelize(zip(
[dt.datetime(2017,1,1,0,0,0) + dt.timedelta(minutes=int(x)) + dt.timedelta(seconds = 30) for x in time_deltas2],
[chr(c) for c in range(ord('f'), ord('z'))]
)),
["ts2", "text2"]
)
+-------------------+-----+
| ts1|text1|
+-------------------+-----+
|2017-01-01 00:20:00| a|
|2017-01-01 00:20:00| b|
|2017-01-01 00:32:00| c|
|2017-01-01 00:44:00| d|
|2017-01-01 01:12:00| e|
+-------------------+-----+
+-------------------+-----+
| ts2|text2|
+-------------------+-----+
|2017-01-01 00:09:30| f|
|2017-01-01 00:12:30| g|
|2017-01-01 00:17:30| h|
|2017-01-01 00:19:30| i|
|2017-01-01 00:23:30| j|
|2017-01-01 00:30:30| k|
|2017-01-01 00:36:30| l|
|2017-01-01 00:44:30| m|
|2017-01-01 00:52:30| n|
|2017-01-01 00:53:30| o|
|2017-01-01 00:59:30| p|
|2017-01-01 01:06:30| q|
|2017-01-01 01:13:30| r|
|2017-01-01 01:21:30| s|
|2017-01-01 01:22:30| t|
|2017-01-01 01:27:30| u|
|2017-01-01 01:36:30| v|
|2017-01-01 01:44:30| w|
|2017-01-01 01:53:30| x|
|2017-01-01 01:57:30| y|
+-------------------+-----+
假设我们的窗口应为/- n
分钟,因此我们想要一个2*n
分钟窗口。
首先,我们将timestamps
施加为int
,然后将它们圆将其围成两个最近的2*n
分钟时间戳:
import pyspark.sql.functions as psf
n = 5*60 # for +/- 5 minutes
df1 = df1.withColumn("ts1", psf.unix_timestamp(df1.ts1)).withColumn(
"time_range",
psf.explode(psf.array((psf.col("ts1")/(2*n)).cast("int") * (2*n), ((1 + psf.col("ts1")/(2*n)).cast("int")) * (2*n)))
)
df2 = df2.withColumn("ts2", psf.unix_timestamp(df2.ts2)).withColumn(
"time_range",
psf.explode(psf.array((psf.col("ts2")/(2*n)).cast("int") * (2*n), ((1 + psf.col("ts2")/(2*n)).cast("int")) * (2*n)))
)
将UNIX时间戳转换为时间戳以可视化:
+-------------------+-----+-------------------+
| ts1|text1| time_range|
+-------------------+-----+-------------------+
|2017-01-01 00:20:00| a|2017-01-01 00:20:00|
|2017-01-01 00:20:00| a|2017-01-01 00:30:00|
|2017-01-01 00:20:00| b|2017-01-01 00:20:00|
|2017-01-01 00:20:00| b|2017-01-01 00:30:00|
|2017-01-01 00:32:00| c|2017-01-01 00:30:00|
|2017-01-01 00:32:00| c|2017-01-01 00:40:00|
|2017-01-01 00:44:00| d|2017-01-01 00:40:00|
|2017-01-01 00:44:00| d|2017-01-01 00:50:00|
|2017-01-01 01:12:00| e|2017-01-01 01:10:00|
|2017-01-01 01:12:00| e|2017-01-01 01:20:00|
+-------------------+-----+-------------------+
+-------------------+-----+-------------------+
| ts2|text2| time_range|
+-------------------+-----+-------------------+
|2017-01-01 00:09:30| f|2017-01-01 00:00:00|
|2017-01-01 00:09:30| f|2017-01-01 00:10:00|
|2017-01-01 00:12:30| g|2017-01-01 00:10:00|
|2017-01-01 00:12:30| g|2017-01-01 00:20:00|
|2017-01-01 00:17:30| h|2017-01-01 00:10:00|
|2017-01-01 00:17:30| h|2017-01-01 00:20:00|
|2017-01-01 00:19:30| i|2017-01-01 00:10:00|
|2017-01-01 00:19:30| i|2017-01-01 00:20:00|
|2017-01-01 00:23:30| j|2017-01-01 00:20:00|
|2017-01-01 00:23:30| j|2017-01-01 00:30:00|
|2017-01-01 00:30:30| k|2017-01-01 00:30:00|
|2017-01-01 00:30:30| k|2017-01-01 00:40:00|
|2017-01-01 00:36:30| l|2017-01-01 00:30:00|
|2017-01-01 00:36:30| l|2017-01-01 00:40:00|
|2017-01-01 00:44:30| m|2017-01-01 00:40:00|
|2017-01-01 00:44:30| m|2017-01-01 00:50:00|
|2017-01-01 00:52:30| n|2017-01-01 00:50:00|
|2017-01-01 00:52:30| n|2017-01-01 01:00:00|
|2017-01-01 00:53:30| o|2017-01-01 00:50:00|
|2017-01-01 00:53:30| o|2017-01-01 01:00:00|
|2017-01-01 00:59:30| p|2017-01-01 00:50:00|
|2017-01-01 00:59:30| p|2017-01-01 01:00:00|
|2017-01-01 01:06:30| q|2017-01-01 01:00:00|
|2017-01-01 01:06:30| q|2017-01-01 01:10:00|
|2017-01-01 01:13:30| r|2017-01-01 01:10:00|
|2017-01-01 01:13:30| r|2017-01-01 01:20:00|
|2017-01-01 01:21:30| s|2017-01-01 01:20:00|
|2017-01-01 01:21:30| s|2017-01-01 01:30:00|
|2017-01-01 01:22:30| t|2017-01-01 01:20:00|
|2017-01-01 01:22:30| t|2017-01-01 01:30:00|
|2017-01-01 01:27:30| u|2017-01-01 01:20:00|
|2017-01-01 01:27:30| u|2017-01-01 01:30:00|
|2017-01-01 01:36:30| v|2017-01-01 01:30:00|
|2017-01-01 01:36:30| v|2017-01-01 01:40:00|
|2017-01-01 01:44:30| w|2017-01-01 01:40:00|
|2017-01-01 01:44:30| w|2017-01-01 01:50:00|
|2017-01-01 01:53:30| x|2017-01-01 01:50:00|
|2017-01-01 01:53:30| x|2017-01-01 02:00:00|
|2017-01-01 01:57:30| y|2017-01-01 01:50:00|
|2017-01-01 01:57:30| y|2017-01-01 02:00:00|
+-------------------+-----+-------------------+
我们现在可以加入2个数据范围,最后一个表将比笛卡尔连接表小,并在|ts1 - ts2| <= n min
上过滤:
df = df1.join(df2, "time_range").filter(
(psf.abs(df1.ts1 - df2.ts2) <= n) | (psf.isnull(df2.ts2))
).withColumn("ts1", psf.from_unixtime("ts1")).withColumn("ts2", psf.from_unixtime("ts2"))
将时间戳转换回时间戳格式后,我们可以收集2型的不同推文:
df = df.groupBy("ts1", "text1").agg(
psf.collect_set(psf.struct("ts2", "text2")).alias("tweet2")
)
+-------------------+-----+---------------------------------------------------------------------------+
|ts1 |text1|tweet2 |
+-------------------+-----+---------------------------------------------------------------------------+
|2017-01-01 00:20:00|a |[[2017-01-01 00:17:30,h], [2017-01-01 00:23:30,j], [2017-01-01 00:19:30,i]]|
|2017-01-01 00:20:00|b |[[2017-01-01 00:17:30,h], [2017-01-01 00:23:30,j], [2017-01-01 00:19:30,i]]|
|2017-01-01 00:32:00|c |[[2017-01-01 00:30:30,k], [2017-01-01 00:36:30,l]] |
|2017-01-01 00:44:00|d |[[2017-01-01 00:44:30,m]] |
|2017-01-01 01:12:00|e |[[2017-01-01 01:13:30,r]] |
+-------------------+-----+---------------------------------------------------------------------------+