在根据 Spark 中的列值从 hive/hdfs 读取数据时对数据进行分区



我有 2 个火花数据帧,我使用sqlContext从 hive 读取。让我们将这些数据帧称为df1df2。两个数据帧中的数据在配置单元级别称为PolicyNumberColumn上进行排序。PolicyNumber也恰好是两个数据帧的主键。以下是两个数据帧的示例值,尽管实际上,我的两个数据帧都很大,并且作为 5 个分区分布在 5 个执行器中。为了简单起见,我假设每个分区都有一条记录。

Sample df1 PolicyNumber FirstName 1 A 2 B 3 C 4 D 5 E

Sample df2 PolicyNumber PremiumAmount 1 450 2 890 3 345 4 563 5 2341

现在,我想加入df1并在PolicyNumber专栏上df2。我可以运行下面的代码段并获得所需的输出。

df1.join(df2,df1.PolicyNumber=df2.PolicyNumber)

现在,我想尽可能避免随机播放,以使此联接高效。因此,为了避免混乱,在从 hive 读取时,我想根据PolicyNumberColumn的值对df1进行分区,这样PolicyNumber 1行将转到Executor 1,具有PolicyNumber 2的行将转到Executor 2,具有PolicyNumber 3的行将转到Executor 3等等。我也想以与df1完全相同的方式划分df2

这样,Executor 1现在将具有df1行与PolicyNumber=1以及来自df2行与PolicyNumber=1。 同样,Executor 2将具有带有PolicyNumber=2df1行,以及带有PolicyNumber=2ans 的df2行,依此类推。

这样,就不需要像现在这样进行任何随机排序,数据是该执行器的本地数据。

我的问题是,有没有办法控制这种粒度的分区?如果是,我该怎么做。

不幸的是,没有对浮动到每个执行器中的数据的直接控制,但是,当您将数据读入每个数据帧时,请使用CLUSTER BYon join 列,该列有助于将数据排序到正确的执行器中。

ex: 
df1 = sqlContext.sql("select * from CLSUTER BY JOIN_COLUMN")
df2 = sqlContext.sql("SELECT * FROM TABLE2 CLSUTER BY JOIN_COLUMN")

希望对您有所帮助。