我有 2 个火花数据帧,我使用sqlContext
从 hive 读取。让我们将这些数据帧称为df1
和df2
。两个数据帧中的数据在配置单元级别称为PolicyNumber
的Column
上进行排序。PolicyNumber
也恰好是两个数据帧的主键。以下是两个数据帧的示例值,尽管实际上,我的两个数据帧都很大,并且作为 5 个分区分布在 5 个执行器中。为了简单起见,我假设每个分区都有一条记录。
Sample df1
PolicyNumber FirstName
1 A
2 B
3 C
4 D
5 E
Sample df2
PolicyNumber PremiumAmount
1 450
2 890
3 345
4 563
5 2341
现在,我想加入df1
并在PolicyNumber
专栏上df2
。我可以运行下面的代码段并获得所需的输出。
df1.join(df2,df1.PolicyNumber=df2.PolicyNumber)
现在,我想尽可能避免随机播放,以使此联接高效。因此,为了避免混乱,在从 hive 读取时,我想根据PolicyNumber
Column
的值对df1
进行分区,这样PolicyNumber 1
行将转到Executor 1
,具有PolicyNumber 2
的行将转到Executor 2
,具有PolicyNumber 3
的行将转到Executor 3
等等。我也想以与df1
完全相同的方式划分df2
。
这样,Executor 1
现在将具有df1
行与PolicyNumber=1
以及来自df2
行与PolicyNumber=1
。 同样,Executor 2
将具有带有PolicyNumber=2
的df1
行,以及带有PolicyNumber=2
ans 的df2
行,依此类推。
这样,就不需要像现在这样进行任何随机排序,数据是该执行器的本地数据。
我的问题是,有没有办法控制这种粒度的分区?如果是,我该怎么做。
不幸的是,没有对浮动到每个执行器中的数据的直接控制,但是,当您将数据读入每个数据帧时,请使用CLUSTER BY
on join 列,该列有助于将数据排序到正确的执行器中。
ex:
df1 = sqlContext.sql("select * from CLSUTER BY JOIN_COLUMN")
df2 = sqlContext.sql("SELECT * FROM TABLE2 CLSUTER BY JOIN_COLUMN")
希望对您有所帮助。