结合delta io和excel读取



使用不带delta的com.crealytics:spark-excel_2.12:0.14.0时:

spark = SparkSession.builder.appName("Word Count")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
.getOrCreate()
df = spark.read.format("com.crealytics.spark.excel")
.option("header", "true")
.load(path2)

它工作,我可以很好地阅读excel文件。但是使用configure_spark_with_delta_pip:

创建会话
builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

给出以下错误:

Py4JJavaError:调用o139.load时发生错误。:java.lang.ClassNotFoundException:查找数据源失败:com.crealytics.spark.excel。请于在http://spark.apache.org/third-party-projects.htmlorg.apache.spark.sql.execution.datasources.DataSource .lookupDataSource美元(DataSource.scala: 692)在org.apache.spark.sql.execution.datasources.DataSource .lookupDataSourceV2美元(DataSource.scala: 746)在org.apache.spark.sql.DataFrameReader.load (DataFrameReader.scala: 265)在org.apache.spark.sql.DataFrameReader.load (DataFrameReader.scala: 239)在java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(本机方法)java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java: 62)在java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java: 43)java.base/java.lang.reflect.Method.invoke(Method.java:566py4j.reflection.MethodInvoker.invoke (MethodInvoker.java: 244)py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java: 357)py4j.Gateway.invoke (Gateway.java: 282)py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java: 132)callcommand .execute(CallCommand.java:79py4j.GatewayConnection.run (GatewayConnection.java: 238)java.base/java.lang.Thread.run(Thread.java:829)原因:java.lang.ClassNotFoundException:com.crealytics.spark.excel.DefaultSource在java.base/java.net.URLClassLoader.findClass (URLClassLoader.java: 471)在java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589java.base/java.lang.ClassLoader.loadClass (ClassLoader.java: 522)org.apache.spark.sql.execution.datasources.DataSource。美元anonfun lookupDataSource 5美元(DataSource.scala: 666)try $.apply(Try.scala:213org.apache.spark.sql.execution.datasources.DataSource。美元anonfun lookupDataSource 4美元(DataSource.scala: 666)Try.scala:224org.apache.spark.sql.execution.datasources.DataSource .lookupDataSource美元(DataSource.scala: 666)…14个

为什么?我怎样才能避免这种情况呢?

你得到这个错误是因为configure_spark_with_delta_pip覆盖/替换你的配置属性spark.jars.packages用适当的delta lake包要导入。因此,您的软件包com.crealytics:spark-excel_2.12:0.14.0可能无法使用/导入。在这里查看源代码片段

scala_version = "2.12"
maven_artifact = f"io.delta:delta-core_{scala_version}:{delta_version}"
return spark_session_builder.config("spark.jars.packages", maven_artifact) 

不幸的是,在这个时候,Builder不允许我们检索现有的配置属性或SparkConf对象来动态调整这些属性之前调用getOrCreate创建或火花会话。

方法1

要解决这个问题,您可以自己检索适当的增量包,类似于configure_spark_with_delta_pip的做法,例如


import importlib_metadata
delta_version = importlib_metadata.version("delta_spark")
scala_version = "2.12"
delta_package = f"io.delta:delta-core_{scala_version}:{delta_version}"
builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", f"com.crealytics:spark-excel_2.12:0.14.0,{delta_package}")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

方法2

要解决这个问题,您可以在应用configure_spark_with_delta_pip的delta包之后创建spark会话。在此之后,您可以使用更新的配置属性触发spark会话的重新初始化。

builder = SparkSession.builder.appName("transaction")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
spark = builder.getOrCreate()

由于两个spark会话具有相同的appName,getOrCreate将检索现有的spark会话,但也将应用新的配置。这个行为在这里被记录为

如果返回一个已经存在的SparkSession,配置选项在此构建器中指定的将应用于现有的SparkSession .

>>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
>>> s1.conf.get("k1") == s2.conf.get("k1") 
True
>>> s1.conf.get("k2") == s2.conf.get("k2") 
True

让我知道这是否适合你。

最新更新