当表已存在时,使用 Spark 数据帧覆盖表失败



我正在尝试使用Spark数据帧完全覆盖postgres表。出于某种原因,即使我指定mode("overwrite"),我也会收到一个relation already existspostgres错误。为什么我的代码没有像预期的那样覆盖数据库中的数据?我已经使用客户端检查了该表,它确实存在(这无关紧要(。里面也有数据。怎么了?这可能是内存问题吗?会不会是queryTimeout

df.write.format('jdbc').options(
url=PSQL_URL_SPARK,
driver=SPARK_ENV['PSQL_DRIVER'],
dbtable="schema.table",
user=SPARK_ENV['PSQL_USER'],
password=SPARK_ENV['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode("overwrite").save()
Traceback (most recent call last):
File "/home/hadoop/spark_script.py", line 671, in <module>
main()
File "/home/hadoop/spark_script.py", line 83, in main
).mode("overwrite").save()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 732, in save
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o352.save.
: org.postgresql.util.PSQLException: ERROR: relation "<table>" already exists
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2468)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2211)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:309)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:446)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:370)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:311)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:297)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:274)
at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:246)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

我遇到了同样的问题,问题来自数据库模式。确保数据库中表中的列和类型与数据帧相同。

可以在新的临时表中写入数据帧,并在 sql 引擎中使用 DESCRIBE 查看两个表中的列和类型。可以尝试再次覆盖临时表,以查看它是否成功将数据写入现有表。

另一个可能的问题是许可。检查表中用户的权限:

SELECT grantee, privilege_type 
FROM information_schema.role_table_grants 
WHERE table_name='mytable';

问题似乎不是mode("overwrite")。 问题出在save(),但Spark试图创建表似乎也很奇怪:

...
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
...

是否正确指定了表名? 我想知道它是否可能是一个 Spark 错误(我对 Spark 的了解不足以做出决定(——也许它试图在public.tablename中创建一个表(因为schame.tablename不存在——这就是我如何设想错误表现自己(,即使您指定了schema.tablename

最新更新