我目前正在尝试加载Parquet文件到Postgres数据库。Parquet文件已经定义了模式,我想把这个模式延续到Postgres表上。
我没有在Postgres中定义任何模式或表。但是我希望加载过程在读取时自动推断模式并创建一个表,然后将SparkSQL数据框加载到该表中。
下面是我的代码:import findspark
findspark.init()
from pyspark.sql import SparkSession
appName = "load_parquet"
master = "local"
spark = SparkSession.builder
.master(master)
.appName(appName)
.getOrCreate()
将Parquet数据作为Spark数据帧读取
customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')
检查模式是否正确
customers_sdf.printSchema()
root
|-- customer_id: string (nullable = true)
|-- customer_unique_id: string (nullable = true)
|-- customer_zip_code_prefix: string (nullable = true)
|-- customer_city: string (nullable = true)
|-- customer_state: string (nullable = true)
将SparkSQL数据框写入Postgres
customers_sdf.write
.jdbc(
url="jdbc:postgresql:destdb",
table="public.customers",
properties={"user": "destdb1", "password": "destdb1"}
)
我的Postgres容器主机名是postgres-dest
,端口映射是5434:5432
。见下文:
postgres-dest:
image: postgres:latest
environment:
POSTGRES_USER: destdb1
POSTGRES_PASSWORD: destdb1
POSTGRES_DB: destdb
logging:
options:
max-size: 10m
max-file: "3"
ports:
- "5434:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "destdb1"]
interval: 5s
retries: 5
restart: always
pyspark-notebook:
build: .
image: jupyter/pyspark-notebook:latest
environment:
JUPYTER_ENABLE_LAB: 'yes'
ports:
- "8889:8889"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work/notebooks
- ./filesystem:/home/jovyan/filesystem
我尝试将数据框写入Postgres,如前所示,但我得到这个错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/tmp/ipykernel_101/2914557055.py in <module>
----> 1 customers_sdf.write
2 .jdbc(url="jdbc:postgresql://postgres-dest/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})
/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
1443 for k in properties:
1444 jprop.setProperty(k, properties[k])
-> 1445 self.mode(mode)._jwrite.jdbc(url, table, jprop)
1446
1447
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o231.jdbc.
: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
注意:我是Spark的绝对初学者,所以请像我5岁一样解释。
将url
改为jdbc:postgresql://postgres-dest:5432/destdb
并确保在类路径中存在PostgreSQL驱动jar。你可以从这里下载jar。