读取镶木地板文件时刷新Dataframe的元数据



我正在尝试读取一个镶木地板文件作为一个数据帧,该文件将定期更新(路径为/folder_name。每当有新数据出现时,旧镶木地板的文件路径(/folder_name)将被重命名为临时路径,然后我们将新数据和旧数据合并,并将存储在旧路径中(/folder_name)

假设我们在更新之前有一个拼花地板文件hdfs://folder_name/part-xxxx-xxx.snappy.parquet,然后在更新之后它被更改为hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet

发生的问题是,当我试图在进行更新时读取镶木地板文件

sparksession.read.parquet("filename")=>它采用旧路径hdfs://folder_name/part-xxxx-xxx.snappy.parquet(路径存在)

当在数据帧上调用一个操作时,它正试图从hdfs://folder_name/part-xxxx-xxx.snappy.parquet读取数据,但由于更新,文件名发生了更改,我得到了以下问题

java.io.FileNotFoundException:文件不存在:hdfs://folder_name/part-xxxx-xxx.snappy.parquet基础文件可能已更新。通过在SQL中运行"REFRESH TABLE tableName"命令或重新创建所涉及的数据集/DataFrame,可以显式地使Spark中的缓存无效。

我正在使用Spark 2.2

有人能帮我刷新元数据吗?

当您试图读取不存在的文件时,会发生此错误。

如果我错了,请纠正我,但我怀疑您在保存新数据帧(使用.mode("overwrite"))时覆盖了所有文件。当该进程运行时,您正试图读取一个已删除的文件,而该异常被抛出——这会使表在一段时间内(在更新期间)不可用。

据我所知,没有一种直接的方法可以按照您的意愿"刷新元数据"。

解决这一问题的两种(几种可能的)方法:

1-使用附加模式

如果您只想将新的数据帧附加到旧的数据帧,则无需创建临时文件夹并覆盖旧的文件夹。您只需将保存模式从覆盖更改为附加即可。通过这种方式,您可以将分区添加到现有的Parquet文件中,而不必重写现有的分区。

df.write
.mode("append")
.parquet("/temp_table")

这是迄今为止最简单的解决方案,不需要读取已经存储的数据。然而,如果你必须更新旧数据(例如:如果你正在进行追加销售),这将不起作用。为此,您可以选择2:

2-使用配置单元视图

您可以创建配置单元表,并使用视图指向最新的(可用的)配置单元表。

以下是这种方法背后的逻辑示例:

第1部分

  • 如果视图<table_name>不存在,我们将创建一个名为<table_name>_alpha0存储新数据
  • 创建表格后我们将视图<table_name>创建为select * from <table_name>_alpha0

第2部分

  • 如果视图<table_name>存在,我们需要查看它指向哪个表(<table_name>_alphaN)

  • 您可以对新数据执行所需的所有操作,将其保存为名为<table_name>_alpha(N+1)的表

  • 创建表后,我们将视图<table_name>更改为select * from <table_name>_alpha(N+1)

还有一个代码示例:

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._

//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')
def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {
val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
.filter("col_name == 'View Text'")
.rdd
if(rdd_desc.isEmpty()) {
None
}
else {
Option(
rdd_desc.first()
.get(1)
.toString
.toLowerCase
.stripPrefix("select * from ")
)
}
}
else
None
}
//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.
def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")
new_df.write
.mode("overwrite")
.format("parquet")
.option("compression","snappy")
.saveAsTable(nextAlphaTable)
spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}
//An example on how to use this:
//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")
saveDataframe(spark, dbName, tableName, df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show
val processed_df = df.unionByName(new_data) //Or other operations you want to do
println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

结果:

Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  1|        I|
|  2|       am|
+---+---------+
Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  5|     with|
|  6|      new|
|  7|     data|
|  1|        I|
|  2|       am|
+---+---------+

通过这样做,您可以保证视图<table_name>的某个版本始终可用。这还具有维护表的早期版本的优点(或者不这样做,取决于您的情况)以前版本的<table_name_alpha1>将是<table_name_alpha0>

3-奖金

如果可以选择升级您的Spark版本,请查看Delta Lake(最低Spark版本:2.4.2)

希望这有帮助:)

首先缓存镶木地板,然后进行覆盖。

var tmp = sparkSession.read.parquet("path/to/parquet_1").cache()
tmp.write.mode(SaveMode.Overwrite).parquet("path/to/parquet_1") // same path

由于spark执行延迟求值,因此引发错误。当DAG执行"写入"命令时,它开始读取镶木地板并同时写入/覆盖。

Spark没有像Zookeeper这样的事务管理器来锁定文件,因此并发读/写是一个需要单独处理的挑战。

要刷新目录,您可以执行以下操作:-

spark.catalog.refreshTable("my_table")

spark.sql(s"REFRESH TABLE $tableName")
  1. 一个简单的解决方案是使用df.cache.count首先引入内存,然后与新数据进行并集,并使用模式overwrite写入/folder_name。在这种情况下,您不必使用temp路径。

  2. 您提到要将/folder_name重命名为某个临时路径。因此,您应该从该临时路径读取旧数据,而不是hdfs://folder_name/part-xxxx-xxx.snappy.parquet

示例

从阅读你的问题中,我认为这可能是你的问题,如果是这样的话,你应该能够在不使用DeltaLake的情况下运行你的代码。在下面的用例中,Spark将这样运行代码:(1)加载inputDF,本地存储文件夹位置的文件名[在这种情况下是显式的部分文件名];(2a)到达第2行并覆盖tempLocation内的文件;(2b)从inputDF加载内容并将其输出到tempLocation;(3) 遵循与1相同的步骤,但在tempLocation上;(4a)删除inputLocation文件夹内的文件;以及(4b)尝试加载缓存在1中的部件文件以从inputDF加载数据以运行并集并中断,因为该文件不存在。

val inputDF = spark.read.format("parquet").load(inputLocation)
inputDF.write.format("parquet").mode("overwrite").save(tempLocation)
val tempDF = spark.read.foramt("parquet").load(tempLocation)
val outputDF = inputDF.unionAll(tempDF)
outputDF.write.format("parquet").mode("overwrite").save(inputLocation)

根据我的经验,您可以遵循两种路径持久化或临时输出用于覆盖的所有内容。

持久性

在下面的用例中,我们将加载inputDF,并立即将其保存为另一个元素并持久化。在执行操作时,持久化将针对数据,而不是文件夹中的文件路径。

否则,您可以对outputDF进行持久化,这将具有相对相同的效果。因为持久性与数据而非文件路径相关,所以对输入的破坏不会导致文件路径在覆盖过程中丢失。

val inputDF = spark.read.format("parquet").load(inputLocation) 
val inputDF2 = inputDF.persist
inputDF2.count
inputDF2.write.format("parquet").mode("overwrite").save(tempLocation)
val tempDF = spark.read.foramt("parquet").load(tempLocation)
val outputDF = inputDF2.unionAll(tempDF) outputDF.write.format("parquet").mode("overwrite").save(inputLocation)

临时荷载

如果不是为并集输入加载临时输出,而是将outputDF完全加载到一个临时文件并为输出重新加载该文件,则不应看到未找到文件的错误。

最新更新