我每天使用Spark scala程序从Mysql向Hive导入1000万条记录,并比较昨天和今天的数据集。
val yesterdayDf=sqlContext.sql("select * from t_yesterdayProducts");
val todayDf=sqlContext.sql("select * from t_todayProducts");
val diffDf=todayDf.except(yesterdayDf);
我正在使用3个节点的集群和程序,可以很好地处理400万条记录。由于RAM内存不足,我们面临着超过400万的内存不足问题。
我想知道比较两个大型数据集的最佳方法。
你试过找出你有多少分区吗yesterdayDf.rdd.partitions.size
将为您提供昨天Df数据帧的信息,您也可以为其他数据帧提供同样的信息。
您也可以使用yesterdayDf.repartition(1000) // (a large number)
以查看OOM问题是否消失。
这个问题的原因很难说。但问题可能是,由于某种原因,工人们获取了太多的数据。尝试清除数据帧以执行除外操作。根据我在评论中的问题,你说你有关键列,所以只取它们这样的:
val yesterdayDfKey = yesterdayDf.select("key-column")
val todayDfKey = todayDf.select("key-column")
val diffDf=todayDfKey.except(yesterdayDfKey);
这样,您将获得一个带有关键帧的数据帧。然后你可以像这篇文章一样使用join来制作一个过滤器。
您还需要确保您的yarn.nodemanager.resource.memory-mb大于--executor内存。
您也可以尝试使用left_anti连接键上的两个df,然后检查的记录数