主要问题:如何安全地(而不会冒因 OOM 而崩溃的风险(从 Spark 中的驱动程序节点迭代数据帧中的每一行(保证每一行(?我需要控制数据返回时的大小,对其进行操作,然后丢弃它以检索下一批(例如一次 1000 行或其他(
我正在尝试以安全且迭代的方式将潜在大型数据帧中的数据带回驱动程序,以便我可以使用这些数据来执行 HTTP 调用。我一直在尝试使用someDf.foreachPartition{makeApiCall(_)}
并允许执行程序处理调用。它可以工作 - 但事实证明,在生产环境中启动时,调试和处理错误非常困难,尤其是在失败的调用中。
我知道有someDf.collect()
操作,它将所有数据一次带回驱动程序。但是,不建议使用此解决方案,因为如果您的 DF 非常大,则可能会使驱动程序崩溃。
有什么建议吗?
如果数据不适合内存,您可以使用类似以下内容:
df.toLocalIterator().forEachRemaining( row => {makeAPICall(row)})
但与collect
相比,toLocalIterator
有相当大的开销
或者,您可以批量收集数据帧(其功能与toLocalIterator
基本相同(:
val partitions = df.rdd.partitions.map(_.index)
partitions.toStream.foreach(i => df.where(spark_partition_id() === lit(i)).collect().map(row => makeAPICall(row)))
将所有数据带回驱动程序是一个坏主意,因为驱动程序只有 1 个节点,它将成为瓶颈。可伸缩性将丢失。如果您必须这样做,那么如果您真的需要大数据应用程序,请三思而后行?应该不会。
dataframe.collect()
是将数据带给驱动程序的最佳方式,它将带来所有数据。另一种方法是toLocalIterator
,这将带来最大分区的数据,该分区也可能很大。因此,这应该很少使用,并且仅用于少量数据。
如果您坚持,则可以将输出写入文件或队列,并以受控方式读取该文件。这将是我不喜欢的部分可扩展解决方案。