如何删除Spark Structured Streaming创建的旧数据



如何删除Spark Structured Streaming(Spark 2.4.5(创建的旧数据?

我有拼花地板/avro格式的HDFS数据(而不是Delta(,它是由Spark Structured Streaming创建的,并按时间(年、月、日、小时(划分。

数据创建如下:

query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")

因此,我有以下分区文件夹布局:

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16

如何删除旧数据,例如,年=2020、月=2、日=13、小时=14以上的数据?

只是删除相关文件夹

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13

从文件系统读取批处理数据帧时引发异常:

df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist

正如我所发现的,这在某种程度上与检查点使用的_spark_metadata文件夹有关。

感谢你的帮助。

我似乎找到了解决方案/解决方法。关键概念是使用FileStreamSinkLog并将其与SinkFileStatus合并,操作设置为delete:

  1. 加载FileStreamSinkLog

    sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
    
  2. 获取最新的SinkFileStatus

    Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
    long batchId = (long)latest.get()._1;
    SinkFileStatus[] fileStatuses = latest.get()._2;
    
  3. 删除旧文件

  4. fileStatuses阵列添加具有delete操作的新条目

  5. 用更新的fileStatuses写回batchId日志文件

但是,这需要停止结构化流作业。因此,没有任何解决方案可以在不停止Spark Structured Streaming的情况下删除旧文件

除非您也删除了相应的检查点文件夹,否则您无法删除该文件夹。您正试图在检查点仍然知道该文件夹的情况下删除该文件夹,这就是发生错误的原因。

不过,除非有必要,否则我真的不建议破坏检查点文件夹。如果在您的情况下可能的话,我建议您将旧数据移动到不同的数据存储类型,例如AWS标准->冰川。

为了便于复制/粘贴,这里有一个从spark 3.0.1开始的工作代码(scala(片段。删除一个文件并写入一个新批:

import org.apache.spark.sql.execution.streaming.FileStreamSinkLog
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
val sinkLog = new FileStreamSinkLog (
1,
spark,
SPARK_METADATA_ROOT
)
val head = sinkLog.allFiles().head
val deleteCommand = s"hadoop fs -rm ${head.path}"
println (Try (deleteCommand ! processlogger).getOrElse(s""""$deleteCommand" failed""") )
head.copy(action = FileStreamSinkLog.DELETE_ACTION)
sinkLog
.add (
latestBatch.get._1+1,
Array(head.copy(action = FileStreamSinkLog.DELETE_ACTION))
)

Spark 3.0.0及以上版本已经实现。

  • 原刊:https://issues.apache.org/jira/browse/SPARK-20568
  • 配置:https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala#L117

基本上,它为提交的文件添加了3种策略(ARCHIVE、DELETE、OFF(,并只允许对其进行配置。

老实说,我自己从来没有尝试过,但我在这里看到了Spark 3+的一些答案,这绝对值得一提。

最新更新