我正在尝试使用PySpark从RDS MySQL实例中读取一个表。这是一个巨大的表,因此我想通过使用分区概念来并行化读取操作。该表没有用于查找分区数的数字列。相反,它有一个时间戳列(即日期时间类型(。
我通过检索时间戳列的最小值和最大值找到了下限和上限。然而,我不确定是否有一个标准的公式可以动态地计算分区的数量。以下是我目前正在做的事情(对numPartititons参数的值进行硬编码(:
select_sql = "SELECT {} FROM {}".format(columns, table)
partition_info = {'partition_column': 'col1',
'lower_bound': '<result of min(col1)>',
'upper_bound': '<result of max(col1)>',
'num_partitions': '10'}
read_df = spark.read.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", select_sql)
.option("user", user)
.option("password", password)
.option("useSSL", False)
.option("partitionColumn", partition_info['partition_column'])
.option("lowerBound", partition_info['lower_bound']))
.option("upperBound", partition_info['upper_bound']))
.option("numPartitions", partition_info['num_partitions'])
.load()
请给我一个可行的解决方案/你的方法。感谢
如何设置numPartitions
取决于集群的定义。这里没有对错或自动设置。只要你了解partitionColumn
、lowerBound
、upperBound
、numPartitions
背后的逻辑,以及可能的许多基准测试,你就可以决定什么是正确的数字。
Pyspark-df.cache((.count((运行需要很长时间
partitionColumn、lowerBound、upperBound和numPartitions参数的含义是什么?