我有以下任务:
- 从多个模式的一个表加载数据
- 使用PySpark
- 使用一个可以访问数据库中所有模式的用户
我正在使用以下代码(或多或少):
def connect_to_oracle_db(spark_session, db_query):
return spark_session.read
.format("jdbc")
.option("url", "jdbc:oracle:thin:@//<host>:<port>/<srvice_name")
.option("user", "<user>")
.option("password", "<pass>")
.option("dbtable", db_query)
.option("driver", "oracle.jdbc.driver.OracleDriver")
def run(self):
all_schemes = <list of all available schemes>
for str_schema in all_schemes:
db_query = "(Select * from " + str_schema + ".TABLE1) TABLE1_DATA"
df_table1 = slef.connect_to_oracle_db(db_query).load()
# process df_table1
大约有300个方案,而且速度相当慢,因为每次迭代都会创建并关闭新的连接。我想找到一种方法来重用现有的连接或以某种方式创建连接池。这对我来说似乎很无效。
你知道如何重用PySpark的连接或创建连接池吗?
在像Spark这样的分布式系统中,没有传统意义上的连接池。您必须记住,每个分区都可以由不同的物理节点、不同的逻辑容器(如果适用于给定的集群管理器)以及不同的JVM来处理。
在这种情况下,连接池并不能起到真正的作用。由于Spark旨在进行大规模导入,因此单个连接的利用率已经相当高。
然而,这里有不同的可见问题(可能还有其他问题,从代码片段中看并不明显,因为您显示的代码实际上并没有获取数据):
-
您没有配置
fetchsize
,因此将使用特定驱动程序的默认值。对于Oracle来说,它是10,完全不适合大规模处理return spark_session.read .format("jdbc") .option("fetchsize", some_reasonable_value) ...
-
您没有配置分区,所以Spark将只使用一个分区来处理所有数据。您可以在从JDBC源迁移数据时如何优化分区中阅读可能的解决方案?
-
您已经将其建模为一个顺序过程。除非数据集以某种方式在下游组合,否则最好为每个表提交一个单独的作业,并让调度器根据可用资源优化。
您还可以考虑在单个应用程序中并行处理表
只是重申一下-Spark是懒惰的,所以核心问题可能在其他地方,而上面列出的问题可能是次要的。