我在Spark中将一些DataFrames连接在一起,但我一直收到以下错误:
PartitioningCollection requires all of its partitionings have the same numPartitions.
在我将两个DataFrame连接在一起之后,似乎会发生这样的情况,每个DataFrame本身似乎都相当合理,但在连接它们之后,如果我试图从连接的DataFrame中获取一行,我会收到这个错误。我真的只是想了解为什么会出现这个错误,或者它背后的含义是什么,因为我似乎找不到任何文档。
以下调用导致此异常:
val resultDataframe = dataFrame1
.join(dataFrame2,
$"first_column" === $"second_column").take(2)
但我当然可以打电话给
dataFrame1.take(2)
和
dataFrame2.take(2)
我还尝试对DataFrames
进行重新分区,在加入之前在dataFrame1
和dataFrame2
上使用Dataset.repartition(numPartitions)
或Dataset.coalesce(numParitions)
,在加入之后在resultDataFrame
上使用,但似乎没有任何影响错误。我还没能找到其他人在粗略搜索后出现错误的参考。。。
在过去的几天里,我遇到了同样的问题,当我在互联网上找不到参考资料时,我很失望。直到你的!
我要补充的几件事是:在对数据帧(多个联接(进行了一组非常复杂的操作后,我出现了错误。此外,这些操作涉及从同一父数据帧生成的数据帧。我试图用一个最小的例子来复制它,但从我的管道中提取它并不是一件小事。
我怀疑,当DAG变得过于复杂时,Spark可能在计算正确的计划时遇到了麻烦。不幸的是,如果这是Spark 2.0.0中的一个错误,那么夜间版本似乎还没有修复它(几天前我尝试过2.0.2版本的快照(。
解决该问题(暂时(的一个实用解决方案似乎是:(在某个时候(将管道中的一些数据帧写入磁盘,然后再次读取。这有效地迫使Spark有一个更小、更易于管理的计划来优化,而且它不再崩溃。当然,这只是暂时的解决办法。
我也遇到过同样的问题。对我来说,这是在从联接的select部分(而不是联接子句本身(中删除一些列之后发生的。
我能够通过在数据帧上调用.repartition()
来修复它。
是否调用缓存方法?
只有当我使用缓存方法时,才会出现这个问题。如果我不调用这个方法,我可以毫无问题地使用数据。
这个问题是关于Spark 2.3.0 中修复的ReorderJoinPredicates
的