我有一个数据框X
包含某些事件(时间点,带时间戳(和另一个含有时间范围的dataframe Y
(也由时间戳指定。(
通过实验和一些阅读,我发现直接加入时间戳的基本方法:
return X.join(Y, (X.ts >= Y.start_ts) & (X.ts < Y.end_ts), "inner")
的表现比第一次加入日期慢得多,然后在特定的时间戳上过滤:
X = X.withColumn("event_date", ts.cast('date'))
Y = Y.withColumn("date", explode(array([start_ts.cast('date')), end_ts.cast('date'))])))
return X
.join(Y, (X.event_date == Y.date), "inner")
.filter((X.ts >= Y.start_ts) & (X.ts < Y.end_ts))
就我的理解,在第一个示例中执行加入的基本方法是:
-
ts
和y订单x和y,然后由start_ts
,然后是end_ts
(在spark中可能在nlogn中?( - 然后可以线性执行加入,每个候选人最多有2个
long
比较
在第二个示例中:
- 到目前
- 通过日期(nlogn,最多较小的2x(订购两个数据范围
- 线性执行加入,每个候选人最多1
long
比较 - 线性过滤结果,最多可以每行2个
long
比较
在第二个示例中,从点2的较小常数足以使其加快速度?还是涉及一些使完美行为的火花优化?
对于初学者来说,这两种方法通常在逻辑上不等于,因此在实践中比较其执行时间几乎没有意义。试想一下,Y.start_ts
始终是最小的代表值,而Y.end_ts
是最大的代表值。从上下文中可以清楚地看出,您不允许这种情况,但这是特定领域的知识,而不是计划者很容易推断出的东西。
从实际执行开始,差异很简单,但基本:
- 第一种情况只是执行笛卡尔产品,然后过滤数据,从而始终进行n * m比较。
- 第二种情况将数据调整并比较平均每天的平均录制率 *比较。
有关详细信息,请参见如何提高与火花之间的状态之间的广播连接速度。