从spark更改Iceberg中的分区字段时出错



我们正在使用spark写入冰山,当重命名分区字段名称时,我们会得到一个验证错误:

org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: some_date: void(1)

看起来Iceberg指的是现有的表分区字段名称,这已经无关紧要了——因为有一个新的分区字段,并且写入模式是"重写";。

有什么建议吗?非常感谢。

这里有一个最小的可复制示例:

创建具有分区字段"的原始表;some_date":

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType ,StructField, StringType
dataDF = [('1991-04-01',)]
schema = StructType([
StructField('some_date',StringType(), True)])
spark = SparkSession.builder.master('local[1]').appName('example') 
.getOrCreate()
df = spark.createDataFrame(data = dataDF, schema = schema)
spark.sql(f"use iprod")  # catalog
spark.sql(f"CREATE SCHEMA IF NOT EXISTS iprod.test_schema")
df.write.mode("overwrite").format("parquet").partitionBy('some_date').saveAsTable("iprod.test_schema.example")

尝试用相同的代码覆盖表,但分区字段重命名为some_date_2

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType ,StructField, StringType
dataDF = [('1991-04-01',)]
schema = StructType([
StructField('some_date_2',StringType(), True)])
spark = SparkSession.builder.master('local[1]').appName('example') 
.getOrCreate()
df = spark.createDataFrame(data = dataDF, schema = schema)
spark.sql(f"use iprod")  # catalog
spark.sql(f"CREATE SCHEMA IF NOT EXISTS iprod.test_schema")
df.write.mode("overwrite").format("parquet").partitionBy('some_date_2').saveAsTable("iprod.test_schema.example")

完整跟踪:

:org.apache.eicenerg.exceptions.ValidationException:找不到分区字段的源列:1000:some_date:void(1)网址:org.apache.conicle.exceptions.ValidationException.check(ValidationException.java:46)网址:org.apache.cincler.PartitionSpec.checkCompatibility(PartitionSpec.java:511)网址:org.apache.cincler.PartitionSpec$Builder.build(PartitionSpec.java:503)网址:org.apache.coliner.TableMetadata.removePartitionIds(TableMetadata.java:768)网址:org.apache.cincler.TableMetadata.buildReplacement(TableMetadata.java:790)网址:org.apache.cincler.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.newReplaceTableTransaction(BaseMetastoreCatalog.java:256)网址:org.apache.cincle.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.createOrReplaceTransaction(BaseMetastoreCatalog.java:244)网址:org.apache.caccline.CachingCatalog$CachingTableBuilder.createOrReplaceTransaction(CachingCatalog.java:244)网址:org.apache.caliner.spark.SparkCatalog.stageCreateOrReplace(SparkCatalog.java:190)网址:org.apache.spark.sql.expension.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:197)网址:org.apache.spark.sql.exexecution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)网址:org.apache.spark.sql.exexecution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)网址:org.apache.spark.sql.exexecution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)网址:org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)网址:org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)网址:org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperation Scope.scala:151)网址:org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)网址:org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)网址:org.apache.spark.sql.exexecution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)网址:org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)网址:org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)网址:org.apache.spark.sqlcatalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)网址:org.apache.spark.sql.exexecution.SQLExecution$.withTracker(SQLExecution.scala:232)网址:org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)在org.apache.spark.sql.exexecution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)网址:org.apache.spark.sqlcatalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)网址:org.apache.spark.sql.exexecution.SQLExecution$.withTracker(SQLExecution.scala:232)在org.apache.spark.sql.exexecution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)网址:org.apache.spark.sql.expension.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)在org.apache.spark.sql.exexecution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)网址:org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)网址:org.apache.spark.sql.expension.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)网址:org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)网址:org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:686)网址:org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:619)在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)位于sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)位于java.lang.reflect.Method.ioke(Method.java:498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)在py4j.reflection.ReflectionEngine.reinvoke(ReflectionEngine.java:357)在py4j。Gateway.invoke(Gateway.java:282)在py4j.commands.AbstractCommand.invokeMethod(AbstractCmd.java:132)在py4j.commands.CallCommand.execute(CallCommand.java:79)在py4j。GatewayConnection.run(GatewayConnection.java:238)在java.lang.Thread.run(Thread.java:750)

此错误是因为您的表的Iceberg表格式是版本1。

您应该将表更新到版本2(format-version表属性)。AFAIK,可以通过SQL:完成

ALTER TABLE catalog.ns.table
SET TBLPROPERTIES (
'format-version' = '2'
)

但也与DataFrame API v2。类似于:

df.writeTo('catalog.ns.table').using("iceberg").tableProperty("format-version", "2").createOrReplace()

您可以在规范中阅读更多关于Iceberg表格式的信息(在这里您可以找到版本1和版本2之间的变更集摘要)。

如果你想坚持使用版本1,你应该DROP,然后重新ADD分区(通过ALTER TABLE)。

最新更新