连接一个小的内存中的spark表和一个大的hive表,而不把所有的记录从hive到spark



Usecase:我在spark中有一个小表(~1000行),和一个巨大的hive表(200亿条记录)。我们称小桌子为基础,称大桌子为主。现在,基表有一个列'id',我需要从主表中获取所有记录,其中main.external_id等于base.id。external_id和id列只有唯一的值

最明显的方法是在spark中将基表注册为临时表,并使用类似

的内容。
sparkSession.sql("select * from base_table JOIN main_table ON base_table.id = main_table.external_id")

然而,这意味着spark将从巨大的hive表中获取所有行,并将其带到内存中,考虑到我们只需要大约1000行,我觉得这是非常昂贵的。我正在寻找一种方法来减少这种网络数据传输。

What I have try

  1. 分区/用桶装:这是我们想到的第一个选项,但这两个选项都不可行,因为当列具有离散值(如城市/国家)而'id'列是唯一键列时,分区会更好。对于存储桶,问题是我们需要创建大量的存储桶,这意味着大量的文件可能会产生一些问题。

  2. JDBC query via Hiveserver2:到目前为止,我们可以通过JDBC驱动程序在hive引擎上执行读取查询。我想知道是否有一种方法可以将基本表从spark发送到hive引擎并在那里执行广播连接,这样网络shuffle只涉及较小的表,而我们不需要将较大的表带到spark内存中。但是,我还没有找到任何可以帮助实现这一点的东西。

(显然我们可以先写基本表到hive,然后再做连接,但根据我从团队得到的信息,hive写不是很有效的性能,并且在过去造成了一些问题)

对于我上面提到的问题,有人有什么解决办法吗?或者是否有其他方法可以达到这个结果?p。S:我使用的是spark 2.3.2, spark-sql, spark-hive和hive-jdbc jar的版本是一样的。

如果只需要主表值,则"in"子句可以使用:

val ids = base_table.select("id").as(Encoders.INT).collect().mkString(",")
sparkSession.sql(s"select * from  main_table where external_id in ($ids)")

最新更新