我正在尝试读取一个镶木地板文件作为一个数据帧,该文件将定期更新(路径为/folder_name
。每当有新数据出现时,旧镶木地板的文件路径(/folder_name
)将被重命名为临时路径,然后我们将新数据和旧数据合并,并将存储在旧路径中(/folder_name
)
假设我们在更新之前有一个拼花地板文件hdfs://folder_name/part-xxxx-xxx.snappy.parquet
,然后在更新之后它被更改为hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet
发生的问题是,当我试图在进行更新时读取镶木地板文件
sparksession.read.parquet("filename")=>它采用旧路径hdfs://folder_name/part-xxxx-xxx.snappy.parquet
(路径存在)
当在数据帧上调用一个操作时,它正试图从hdfs://folder_name/part-xxxx-xxx.snappy.parquet
读取数据,但由于更新,文件名发生了更改,我得到了以下问题
java.io.FileNotFoundException:文件不存在:hdfs://folder_name/part-xxxx-xxx.snappy.parquet
基础文件可能已更新。通过在SQL中运行"REFRESH TABLE tableName"命令或重新创建所涉及的数据集/DataFrame,可以显式地使Spark中的缓存无效。
我正在使用Spark 2.2
有人能帮我刷新元数据吗?
当您试图读取不存在的文件时,会发生此错误。
如果我错了,请纠正我,但我怀疑您在保存新数据帧(使用.mode("overwrite")
)时覆盖了所有文件。当该进程运行时,您正试图读取一个已删除的文件,而该异常被抛出——这会使表在一段时间内(在更新期间)不可用。
据我所知,没有一种直接的方法可以按照您的意愿"刷新元数据"。
解决这一问题的两种(几种可能的)方法:
1-使用附加模式
如果您只想将新的数据帧附加到旧的数据帧,则无需创建临时文件夹并覆盖旧的文件夹。您只需将保存模式从覆盖更改为附加即可。通过这种方式,您可以将分区添加到现有的Parquet文件中,而不必重写现有的分区。
df.write
.mode("append")
.parquet("/temp_table")
这是迄今为止最简单的解决方案,不需要读取已经存储的数据。然而,如果你必须更新旧数据(例如:如果你正在进行追加销售),这将不起作用。为此,您可以选择2:
2-使用配置单元视图
您可以创建配置单元表,并使用视图指向最新的(可用的)配置单元表。
以下是这种方法背后的逻辑示例:
第1部分
- 如果视图
<table_name>
不存在,我们将创建一个名为<table_name>_alpha0
存储新数据 - 创建表格后我们将视图
<table_name>
创建为select * from <table_name>_alpha0
第2部分
如果视图
<table_name>
存在,我们需要查看它指向哪个表(<table_name>_alphaN)
您可以对新数据执行所需的所有操作,将其保存为名为
<table_name>_alpha(N+1)
的表创建表后,我们将视图
<table_name>
更改为select * from <table_name>_alpha(N+1)
还有一个代码示例:
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._
//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')
def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {
val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
.filter("col_name == 'View Text'")
.rdd
if(rdd_desc.isEmpty()) {
None
}
else {
Option(
rdd_desc.first()
.get(1)
.toString
.toLowerCase
.stripPrefix("select * from ")
)
}
}
else
None
}
//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.
def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")
new_df.write
.mode("overwrite")
.format("parquet")
.option("compression","snappy")
.saveAsTable(nextAlphaTable)
spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}
//An example on how to use this:
//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")
saveDataframe(spark, dbName, tableName, df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show
val processed_df = df.unionByName(new_data) //Or other operations you want to do
println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show
结果:
Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id| text|
+---+---------+
| 3| a|
| 4|dataframe|
| 1| I|
| 2| am|
+---+---------+
Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id| text|
+---+---------+
| 3| a|
| 4|dataframe|
| 5| with|
| 6| new|
| 7| data|
| 1| I|
| 2| am|
+---+---------+
通过这样做,您可以保证视图<table_name>
的某个版本始终可用。这还具有维护表的早期版本的优点(或者不这样做,取决于您的情况)即以前版本的<table_name_alpha1>
将是<table_name_alpha0>
3-奖金
如果可以选择升级您的Spark版本,请查看Delta Lake(最低Spark版本:2.4.2)
希望这有帮助:)
首先缓存镶木地板,然后进行覆盖。
var tmp = sparkSession.read.parquet("path/to/parquet_1").cache()
tmp.write.mode(SaveMode.Overwrite).parquet("path/to/parquet_1") // same path
由于spark执行延迟求值,因此引发错误。当DAG执行"写入"命令时,它开始读取镶木地板并同时写入/覆盖。
Spark没有像Zookeeper这样的事务管理器来锁定文件,因此并发读/写是一个需要单独处理的挑战。
要刷新目录,您可以执行以下操作:-
spark.catalog.refreshTable("my_table")
或
spark.sql(s"REFRESH TABLE $tableName")
-
一个简单的解决方案是使用df.cache.count首先引入内存,然后与新数据进行并集,并使用模式
overwrite
写入/folder_name
。在这种情况下,您不必使用temp
路径。 -
您提到要将
/folder_name
重命名为某个临时路径。因此,您应该从该临时路径读取旧数据,而不是hdfs://folder_name/part-xxxx-xxx.snappy.parquet
。
示例
从阅读你的问题中,我认为这可能是你的问题,如果是这样的话,你应该能够在不使用DeltaLake的情况下运行你的代码。在下面的用例中,Spark将这样运行代码:(1)加载inputDF,本地存储文件夹位置的文件名[在这种情况下是显式的部分文件名];(2a)到达第2行并覆盖tempLocation内的文件;(2b)从inputDF加载内容并将其输出到tempLocation;(3) 遵循与1相同的步骤,但在tempLocation上;(4a)删除inputLocation文件夹内的文件;以及(4b)尝试加载缓存在1中的部件文件以从inputDF加载数据以运行并集并中断,因为该文件不存在。
val inputDF = spark.read.format("parquet").load(inputLocation)
inputDF.write.format("parquet").mode("overwrite").save(tempLocation)
val tempDF = spark.read.foramt("parquet").load(tempLocation)
val outputDF = inputDF.unionAll(tempDF)
outputDF.write.format("parquet").mode("overwrite").save(inputLocation)
根据我的经验,您可以遵循两种路径持久化或临时输出用于覆盖的所有内容。
持久性
在下面的用例中,我们将加载inputDF,并立即将其保存为另一个元素并持久化。在执行操作时,持久化将针对数据,而不是文件夹中的文件路径。
否则,您可以对outputDF进行持久化,这将具有相对相同的效果。因为持久性与数据而非文件路径相关,所以对输入的破坏不会导致文件路径在覆盖过程中丢失。
val inputDF = spark.read.format("parquet").load(inputLocation)
val inputDF2 = inputDF.persist
inputDF2.count
inputDF2.write.format("parquet").mode("overwrite").save(tempLocation)
val tempDF = spark.read.foramt("parquet").load(tempLocation)
val outputDF = inputDF2.unionAll(tempDF) outputDF.write.format("parquet").mode("overwrite").save(inputLocation)
临时荷载
如果不是为并集输入加载临时输出,而是将outputDF完全加载到一个临时文件并为输出重新加载该文件,则不应看到未找到文件的错误。