关于减少连接加载到spark的数据的想法



所以我有一个输入数据流,它是文件活动的日志。我需要将这些数据与包含每个字段id上的元数据的表连接起来。

作业每小时运行一次。每小时大约有1M个日志,总共大约有5B个文件(或者文件-元数据表中总共有5B行)。这意味着即使在最坏的情况下(每个日志都指向一个唯一的文件),我们也只会看到1M个文件id,因此只需要大约1/5000的文件元数据行。问题是,我们仍然需要将所有5B行加载到内存中,而文件-元数据表的大小约为500GB,每次运行只需10-15分钟即可加载。

1/5000是非常低效的,这个加载时间正在扼杀我们的工作性能和齿轮。感觉应该有一种方法可以加载更少的数据,或者不必每次都加载数据,或者其他一些想法。将其减少5倍至1/1000似乎是一个合理的要求,但我不知道如何实现这一目标。

我相信这在大数据世界中是一个非常普遍的问题,所以我只是想知道有什么策略可以解决这个问题。如果有任何相关的资源,我将不胜感激。

我的想法:

  • 我考虑过将表分成更小的块,并以某种方式按优先级分组,但是有一个性能权衡加载更多的文件,加上即使我将它分成5K文件,每个文件将有1M行,并且这些文件id中没有一个存在于传入日志流中的几率似乎很低。
  • 我想过把表保存在内存中,并且有更长的运行作业,但是像这样保持500GB的内存是一个很大的资源要求。
  • 我想过做更大的批量(在6小时批次中运行作业而不是每小时),但是1M行已经涉及一些繁重的计算,并将其增加到6M可能不是一个好主意。另外,无论如何,我们也想转向流作业。
  • 我们有其他作业使用这个数据,所以我可以想象一个解决方案,这个表被永久加载到一些共享内存中,所有的作业都可以从这个地方读取。问题是我们必须把它分配给执行器来执行连接,这又回到了最初的问题。也许你可以使用bloom过滤器只从共享内存中加载数据的子集到执行器。

最后的想法:

  • 我们希望将其从每小时的批处理作业转变为流作业,因此理想情况下解决方案也可以在那里工作。
  • 我们已经使用了bloom过滤器,但这仍然需要将所有数据加载到内存中,然后从那里过滤它。此外,一旦我们转移到流媒体工作,我不确定我们甚至可以使用布鲁姆过滤器了。

如果您的大文件-元数据表是一个拼花文件,它会自动带谓词下推。有关这个主题的更多信息,请参阅此页。该配置名为spark.sql.parquet.filterPushdown,在最近的Spark版本中默认为true。您只需要在spark.read.parquet调用中添加合理的过滤器。

你确实可以走完整的流路线,然后处理三角洲湖泊甚至可能给你一个更简单的解决方案。

希望这对你有帮助!

最新更新