Pyspark-重用JDBC连接



我有以下任务:

  • 从多个模式的一个表加载数据
  • 使用PySpark
  • 使用一个可以访问数据库中所有模式的用户

我正在使用以下代码(或多或少):

def connect_to_oracle_db(spark_session, db_query):
return spark_session.read 
.format("jdbc") 
.option("url", "jdbc:oracle:thin:@//<host>:<port>/<srvice_name") 
.option("user", "<user>") 
.option("password", "<pass>") 
.option("dbtable", db_query) 
.option("driver", "oracle.jdbc.driver.OracleDriver") 
def run(self):
all_schemes  = <list of all available schemes>
for str_schema in all_schemes:
db_query = "(Select * from " + str_schema + ".TABLE1) TABLE1_DATA"
df_table1 = slef.connect_to_oracle_db(db_query).load()
# process df_table1

大约有300个方案,而且速度相当慢,因为每次迭代都会创建并关闭新的连接。我想找到一种方法来重用现有的连接或以某种方式创建连接池。这对我来说似乎很无效。

你知道如何重用PySpark的连接或创建连接池吗?

在像Spark这样的分布式系统中,没有传统意义上的连接池。您必须记住,每个分区都可以由不同的物理节点、不同的逻辑容器(如果适用于给定的集群管理器)以及不同的JVM来处理。

在这种情况下,连接池并不能起到真正的作用。由于Spark旨在进行大规模导入,因此单个连接的利用率已经相当高。

然而,这里有不同的可见问题(可能还有其他问题,从代码片段中看并不明显,因为您显示的代码实际上并没有获取数据):

  • 您没有配置fetchsize,因此将使用特定驱动程序的默认值。对于Oracle来说,它是10,完全不适合大规模处理

    return spark_session.read 
    .format("jdbc")
    .option("fetchsize", some_reasonable_value)
    ...
    
  • 您没有配置分区,所以Spark将只使用一个分区来处理所有数据。您可以在从JDBC源迁移数据时如何优化分区中阅读可能的解决方案?

  • 您已经将其建模为一个顺序过程。除非数据集以某种方式在下游组合,否则最好为每个表提交一个单独的作业,并让调度器根据可用资源优化。

    您还可以考虑在单个应用程序中并行处理表

只是重申一下-Spark是懒惰的,所以核心问题可能在其他地方,而上面列出的问题可能是次要的。

相关内容

  • 没有找到相关文章

最新更新