使用spark-scala比较两个大型数据集时出现内存不足问题



我每天使用Spark scala程序从Mysql向Hive导入1000万条记录,并比较昨天和今天的数据集。

val yesterdayDf=sqlContext.sql("select * from t_yesterdayProducts");
val todayDf=sqlContext.sql("select * from t_todayProducts");
val diffDf=todayDf.except(yesterdayDf);

我正在使用3个节点的集群和程序,可以很好地处理400万条记录。由于RAM内存不足,我们面临着超过400万的内存不足问题。

我想知道比较两个大型数据集的最佳方法。

你试过找出你有多少分区吗yesterdayDf.rdd.partitions.size将为您提供昨天Df数据帧的信息,您也可以为其他数据帧提供同样的信息。

您也可以使用yesterdayDf.repartition(1000) // (a large number)以查看OOM问题是否消失。

这个问题的原因很难说。但问题可能是,由于某种原因,工人们获取了太多的数据。尝试清除数据帧以执行除外操作。根据我在评论中的问题,你说你有关键列,所以只取它们这样的:

val yesterdayDfKey = yesterdayDf.select("key-column")
val todayDfKey = todayDf.select("key-column")
val diffDf=todayDfKey.except(yesterdayDfKey);

这样,您将获得一个带有关键帧的数据帧。然后你可以像这篇文章一样使用join来制作一个过滤器。

您还需要确保您的yarn.nodemanager.resource.memory-mb大于--executor内存。

您也可以尝试使用left_anti连接键上的两个df,然后检查的记录数

相关内容

  • 没有找到相关文章

最新更新