我的数据集约为240万行,每行唯一的键。我在其他一些表上执行了一些复杂的SQL查询,并产生了一个带有两个列的数据集,一个键和值true
。该数据集约为500行。现在,我想(外部)使用我的原始表加入此数据集。
这会产生一个具有非常稀疏的值集的新表(在大约500行,其他地方为无效)。
最后,我想做大约200次,给我一个大约201列的最终表(钥匙,以及200列稀疏列)。
当我运行此操作时,我注意到它运行时会变慢得多。第一个加入需要2秒,然后需要4秒,然后是6,然后是10秒,然后是20秒,然后加入大约30个系统,系统将永远不会恢复。当然,实际数字是无关紧要的,因为这取决于我正在运行的群集,但我想知道:
- 期望这种放缓?
- 我正在使用parquet作为数据存储格式(列存储),所以我希望添加更多的列可以水平扩展,这是一个正确的假设吗?
- 到目前为止我加入的所有列都不需要第n个加入,可以从内存中卸载它们吗?
- 在Spark中组合很多列时,我还能做其他事情吗?
- 在循环中的每个加入上调用
explain
表明每个联接变得越来越复杂(似乎包括所有以前的加入,它也包括复杂的SQL查询,即使这些查询是checkpointed
)。有没有办法真正检查点,所以每个联接只是一个联接?我实际上每次加入后都在打电话show()
,所以我以为加入实际上是在此时发生的。
这是预期的放缓
是的,在某种程度上是。加入属于数据密集型系统中最昂贵的操作(声称线性可伸缩性通常使加入桌面的产品并非巧合)。分布式系统中的类似JOIN的操作通常需要击中一堆高潜伏期的节点之间的数据交换。
在SPARK SQL中,计算执行计划还具有额外的成本,该计划比线性复杂性大。
我正在使用parquet作为数据存储格式(列存储),所以我希望添加更多的列可以水平扩展,这是一个正确的假设吗?
否。输入格式根本不会影响加入逻辑。
到目前为止我加入的所有列都不需要第n个加入,可以从内存中卸载吗?
如果真正排除在最终输出中,将从执行计划中修剪它们。但是由于您是有原因的,所以我认为不是这样,最终输出需要。
有没有办法真正检查点的方法,所以每个加入只是一个联接?每次加入后,我实际上是在打电话show(),所以我以为加入实际上是在那时发生的。
show
仅计算输出所需的一小部分数据。它不会缓存,尽管可以重复使用洗牌文件。
(似乎包括所有以前的加入,它还包括复杂的SQL查询,即使已检查了这些查询)。
仅在完全计算数据并且不要从执行计划中删除阶段时创建检查点。如果您想明确地进行操作,请写入部分结果以持续存储并在每次迭代的开头中读回它(可能是过度杀伤)。
在Spark中组合许多列时,我还能做其他事情?
您可以做的最好的事情是找到一种避免完全加入的方法。如果键总是相同的,则单个洗牌,并且在组/分区(使用byKey
方法,窗口函数)上操作可能是更好的选择。
但是,如果您
有一个约24m行的数据集
然后使用支持就地修改的非分布系统可能是更好的选择。
在最幼稚的实现中,您可以单独计算每个聚合,按键排序并写入磁盘。然后可以通过无忽略的内存足迹来排队将数据合并在一起。