使用 scala 读取 hdfs 目录中的文件后如何删除该文件?



我使用 fileStream 从 Spark(流上下文)读取 hdfs 目录中的文件。如果我的 Spark 在一段时间后关闭并启动,我想读取目录中的新文件。我不想读取已被Spark读取和处理的目录中的旧文件。我试图在这里避免重复。

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File")

有什么代码片段可以提供帮助吗?

您可以使用FileSystemAPI:

import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sc.hadoopConfiguration)
val outPutPath = new Path("/abc")
if (fs.exists(outPutPath))
fs.delete(outPutPath, true)

fileStream已经为你处理了这个问题 - 来自它的Scaladoc:

创建一个输入流,用于监控与 Hadoop 兼容的文件系统中的新文件,并使用给定的键值类型和输入格式读取它们。

这意味着fileStream只会加载新文件(在启动流式处理上下文后创建),则在启动流式处理应用程序之前文件夹中已存在的任何文件都将被忽略。

最新更新