我的代码在小数据集(几百万行(上运行良好,但在大数据集(>10亿行(上失败。它抛出的错误是:
Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
我仔细检查了执行人和驾驶员的日志。其中没有任何内容表明这两个大小的数据集之间发生了什么不同。我使用的代码是:
spark_df = spark_df.repartition([KEY COLUMNS])
rdd = spark_df.rdd.mapPartitions(lambda x: process_partition(x))
final_df = spark.createDataFrame(rdd, schema=schema, verifySchema=True)
final_df.write.format("delta").mode([MODE]).save([SAVE_LOCATION])
我尝试了很多东西:
- 更改groupby使组变小
- 增加了群集中计算机的资源
- 注释掉了除1以外的所有";转换";在代码库中
- 更改或添加了以下群集配置选项:
- 备用网络超时10000000
- spark.executer.heartbeat间隔10000000
- 为作业添加了超时:10000000
在整个过程中,错误没有改变,日志似乎没有包含任何有用的信息来帮助我了解发生了什么。
尽管日志和文档对将解决方案与问题联系起来毫无帮助,但解决方案最终非常简单。
默认情况下,Databricks/Spark使用200个分区。对于较小的数据集,这很好。对于较大的数据集,它太小了。解决方案是在重新分区调用中提供所需数量的分区。
spark_df = spark_df.repartition(1000, [KEY_COLUMNS])