Pyspark-df.cache().count()需要很长时间才能运行



我正试图使用我在线阅读的计数方法来强制对PySpark进行热切的评估:

spark_df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
spark_df.cache().count()

但是,当我尝试运行代码时,缓存计数部分需要很长时间才能运行。我的数据大小相对较小(2.7GB,15密耳行(,但运行了28分钟后,我决定放弃这项工作。相比之下,当我使用pandas.read_sql((方法读取数据时,只花了6分43秒。

我运行代码的机器非常强大(20 vCPU,160 GB RAM,Windows操作系统(。我想我错过了加快计数的一步。

如有任何帮助或建议,我们将不胜感激。

当您使用panda进行读取时,它将从机器的可用内存中使用尽可能多的内存(如您所述,假设全部为160Gb,远大于数据本身~3Gb(。

然而,Spark却不一样。当你启动你的Spark会话时,通常你必须提前提到每个执行器(以及驱动程序和应用程序管理器,如果适用(想要使用多少内存,如果你没有指定,根据最新的Spark文档,它将是1Gb。因此,你想做的第一件事就是给你的遗嘱执行人和司机更多的记忆。

其次,Spark从JDBC中读取是很棘手的,因为速度慢与否取决于执行器(和任务(的数量,而这些数量取决于RDD(从JDBC连接中读取(有多少分区,分区的数量取决于表、查询、列、条件等。强制改变行为的一种方法是拥有更多的分区、更多的任务、更多的执行器,。。。是通过以下配置:numPartitionspartitionColumnlowerBoundupperBound

  • numPartitions是分区的数量(因此将使用执行器的数量(
  • partitionColumn是一个整数类型列,Spark将使用它来确定分区目标
  • lowerBound是您要读取的partitionColumn的最小值
  • upperBound是您要读取的partitionColumn的最大值

您可以在此处阅读更多信息https://stackoverflow.com/a/41085557/3441510,但基本思想是,您希望使用合理数量的执行器(由numPartitions定义(,为每个执行器(由partitionColumnlowerBoundupperBound定义(处理均匀分布的数据块

最新更新