用自动定义的模式将SparkSQL数据框加载到Postgres数据库中



我目前正在尝试加载Parquet文件到Postgres数据库。Parquet文件已经定义了模式,我想把这个模式延续到Postgres表上。

我没有在Postgres中定义任何模式或表。但是我希望加载过程在读取时自动推断模式并创建一个表,然后将SparkSQL数据框加载到该表中。

下面是我的代码:
import findspark
findspark.init()
from pyspark.sql import SparkSession
appName = "load_parquet"
master = "local"
spark = SparkSession.builder 
.master(master) 
.appName(appName) 
.getOrCreate()

将Parquet数据作为Spark数据帧读取

customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')

检查模式是否正确

customers_sdf.printSchema()
root
|-- customer_id: string (nullable = true)
|-- customer_unique_id: string (nullable = true)
|-- customer_zip_code_prefix: string (nullable = true)
|-- customer_city: string (nullable = true)
|-- customer_state: string (nullable = true)

将SparkSQL数据框写入Postgres

customers_sdf.write 
.jdbc(
url="jdbc:postgresql:destdb", 
table="public.customers", 
properties={"user": "destdb1", "password": "destdb1"}
)

我的Postgres容器主机名是postgres-dest,端口映射是5434:5432。见下文:

postgres-dest:
image: postgres:latest
environment:
POSTGRES_USER: destdb1
POSTGRES_PASSWORD: destdb1
POSTGRES_DB: destdb
logging:
options:
max-size: 10m
max-file: "3"
ports:
- "5434:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "destdb1"]
interval: 5s
retries: 5
restart: always
pyspark-notebook:
build: .
image: jupyter/pyspark-notebook:latest
environment:
JUPYTER_ENABLE_LAB: 'yes'
ports:
- "8889:8889"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks
- ./filesystem:/home/jovyan/filesystem

我尝试将数据框写入Postgres,如前所示,但我得到这个错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_101/2914557055.py in <module>
----> 1 customers_sdf.write 
2     .jdbc(url="jdbc:postgresql://postgres-dest/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})
/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
1443         for k in properties:
1444             jprop.setProperty(k, properties[k])
-> 1445         self.mode(mode)._jwrite.jdbc(url, table, jprop)
1446 
1447 
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302 
1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
1305             answer, self.gateway_client, self.target_id, self.name)
1306 
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109     def deco(*a, **kw):
110         try:
--> 111             return f(*a, **kw)
112         except py4j.protocol.Py4JJavaError as e:
113             converted = convert_exception(e.java_exception)
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
327                     "An error occurred while calling {0}{1}{2}.n".
328                     format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o231.jdbc.
: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)

注意:我是Spark的绝对初学者,所以请像我5岁一样解释。

url改为jdbc:postgresql://postgres-dest:5432/destdb

并确保在类路径中存在PostgreSQL驱动jar。你可以从这里下载jar。

相关内容

  • 没有找到相关文章

最新更新