为什么在时间戳上加入日期 过滤器比在Spark上加入时端的速度更快



我有一个数据框X包含某些事件(时间点,带时间戳(和另一个含有时间范围的dataframe Y(也由时间戳指定。(

通过实验和一些阅读,我发现直接加入时间戳的基本方法:

return X.join(Y, (X.ts >= Y.start_ts) & (X.ts < Y.end_ts), "inner")

的表现比第一次加入日期慢得多,然后在特定的时间戳上过滤:

X = X.withColumn("event_date", ts.cast('date'))
Y = Y.withColumn("date", explode(array([start_ts.cast('date')), end_ts.cast('date'))])))
return X 
    .join(Y, (X.event_date == Y.date), "inner") 
    .filter((X.ts >= Y.start_ts) & (X.ts < Y.end_ts))

就我的理解,在第一个示例中执行加入的基本方法是:

  1. ts和y订单x和y,然后由 start_ts,然后是 end_ts(在spark中可能在nlogn中?(
  2. 然后可以线性执行加入,每个候选人最多有2个long比较

在第二个示例中:

  1. 到目前
  2. 通过日期(nlogn,最多较小的2x(订购两个数据范围
  3. 线性执行加入,每个候选人最多1 long比较
  4. 线性过滤结果,最多可以每行2个long比较

在第二个示例中,从点2的较小常数足以使其加快速度?还是涉及一些使完美行为的火花优化?

对于初学者来说,这两种方法通常在逻辑上不等于,因此在实践中比较其执行时间几乎没有意义。试想一下,Y.start_ts始终是最小的代表值,而Y.end_ts是最大的代表值。从上下文中可以清楚地看出,您不允许这种情况,但这是特定领域的知识,而不是计划者很容易推断出的东西。

从实际执行开始,差异很简单,但基本:

  • 第一种情况只是执行笛卡尔产品,然后过滤数据,从而始终进行n * m比较。
  • 第二种情况将数据调整并比较平均每天的平均录制率 *比较。

有关详细信息,请参见如何提高与火花之间的状态之间的广播连接速度。

最新更新