我有一个非常简单的SparkSQL连接到Postgres DB的设置,我试图从表中获得一个DataFrame,该DataFrame具有多个分区(让我们说2)。代码如下:
Map<String, String> options = new HashMap<String, String>();
options.put("url", DB_URL);
options.put("driver", POSTGRES_DRIVER);
options.put("dbtable", "select ID, OTHER from TABLE limit 1000");
options.put("partitionColumn", "ID");
options.put("lowerBound", "100");
options.put("upperBound", "500");
options.put("numPartitions","2");
DataFrame housingDataFrame = sqlContext.read().format("jdbc").options(options).load();
由于某种原因,DataFrame的一个分区包含了几乎所有的行。
我能理解的是lowerBound/upperBound
是用于微调的参数。在SparkSQL的文档(Spark 1.4.0 - Spark - sql_type .11)中说它们是用来定义跨距的,而不是用来过滤/划分分区列的。但这也提出了几个问题:
- 步幅是频率(每个查询返回的元素数量),Spark将为每个执行器(分区)查询DB ?
- 如果不是,这些参数的目的是什么,它们依赖于什么,以及我如何以稳定的方式平衡我的DataFrame分区(不要求所有分区包含相同数量的元素,只是有一个平衡-例如2个分区100个元素55/45,60/40甚至65/35会做)
似乎找不到这些问题的明确答案,我想知道你们中的一些人是否可以为我澄清这一点,因为现在在处理X万行时影响了我的集群性能,所有繁重的工作都集中在一个执行器上。
谢谢你的时间。
基本上,下限和上限以及分区的数量用于计算每个并行任务的增量或分割。
假设这个表有分区列"year",数据从2006年到2016年。
如果您将分区数量定义为10,下界为2006,上界为2016,您将使每个任务获取自己年份的数据-这是理想的情况。
即使您错误地指定了下限和/或上限,例如设置lower = 0和upper = 2016,也会在数据传输中出现倾斜,但是,您不会"丢失"或无法检索任何数据,因为:
第一个任务将获取年份<</p> 0。
第二个任务将获取0年到2016/10年之间的数据。
第三个任务将获取2016/10和2*2016/10之间的年份的数据。
…
最后一个任务将有一个where条件year->2016。
t .
下界确实用于分区列;参考这段代码(写这篇文章时的当前版本):
https://github.com/apache/spark/blob/40ed2af587cedadc6e5249031857a922b3b234ca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala函数columnPartition
包含了分区逻辑和下限/上限使用的代码。
下界和上界目前已经确定了它们在前面的答案中所做的事情。接下来的问题是如何在不查看最小最大值或数据严重倾斜的情况下平衡分区间的数据。
如果你的数据库支持"哈希"函数,它可以做到这一点。
partitionColumn = "hash(column_name)%num_partitions"
numPartitions = 10//任意设置
lowerBound = 0
upperBound = numPartitions
只要模数运算返回一个均匀分布在[0,numPartitions)上