SparkSQL:如何在从数据库加载数据集时指定分区列



我使用Spark 2.3,并使用jdbc从MySQL加载数据,如下所示

val dataSet:Dataset[Row] = _spark
.read
.format("jdbc")
.options(Map("url" -> jdbcUrl
,"user" -> username
,"password" -> password
,"dbtable" -> dataSourceTableName
,"driver" -> driver
))
.load() 

我想根据表中的特定列对数据集进行分区。我怎样才能做到这一点?

您需要指定partitionColumnupperBoundlowerBoundnumPartitions选项。

这些在spark-sql的JDBC文档中的属性表中进行了描述。

如果指定了其中任何选项,则必须全部指定这些选项。在里面此外,必须指定numPartitions。他们描述了如何当从多个工作线程并行读取时,对表进行分区。partitionColumn必须是有问题的表格。请注意,刚刚使用了lowerBoundupperBound决定分区步长,而不是筛选表中的行。因此,表中的所有行都将被分区并返回。此选项仅适用于阅读。

有关upperBoundlowerBound参数的进一步解释,请访问PIYUSH PASARI的答案。

他给出了以下使用以下参数值生成的查询示例

upperBound = 500lowerBound = 0numPartitions = 5

SELECT * FROM table WHERE partitionColumn < 100 or partitionColumn is null
SELECT * FROM table WHERE partitionColumn >= 100 AND <200 
SELECT * FROM table WHERE partitionColumn >= 200 AND <300
SELECT * FROM table WHERE partitionColumn >= 300 AND <400
...
SELECT * FROM table WHERE partitionColumn >= 400

这可以从JDBCRelation.scale.中的代码中看到

正如您所看到的,所有的行都被提取了,但如果您的上限和下限没有覆盖整个数据范围,那么第一个和最后一个分区可能会比其他分区大。如果你不能确定上限和下限,想要偶数分割,并且不关心得到每一行,你可以总是在dbtable参数中设置上限和下限作为条件。

spark.read("jdbc")
.option("url", url)
.option("dbtable", "pets")
.option("user", user)
.option("password", password)
.option("numPartitions", 10)
.option("partitionColumn", "owner_id")
.option("lowerBound", 1)
.option("upperBound", 10000)

阅读更多关于以下链接

  • https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3

  • http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

相关内容

  • 没有找到相关文章

最新更新