Spark 流式处理仅流式传输在流初始化时间之后创建的文件



有没有办法配置textFileStream源,以便无论文件创建时间如何,它都会处理添加到源目录的任何文件?

为了演示此问题,我创建了一个基本的 Spark 流式处理应用程序,该应用程序使用 textFileStream 作为源并将流内容打印到控制台。将运行应用程序之前创建的现有文件复制到源目录时,不会将任何内容打印到控制台。将应用程序开始运行后创建的文件复制到源目录时,将打印文件内容。以下是我的代码供参考。

val conf = new SparkConf().setAppName("Streaming Test")
                          .setMaster("local[*]")
val spark = new SparkContext(conf)
val ssc = new StreamingContext(spark, Seconds(5))
val fileStream = ssc.textFileStream("/stream-source")
val streamContents = fileStream.flatMap(_.split(" "))
streamContents.print()

这是记录在案的 FileInputDStream 行为。

如果我们想使用该目录中的现有文件,我们可以使用 Spark API 加载这些文件并将我们想要的逻辑应用于它们。

val existingFiles = sparkContext.textFile(path)

val existingFilesDS = sparkSession.read.text(path)

然后,设置并启动流式处理逻辑。我们甚至可以在处理新文件时使用现有文件的数据。

相关内容

  • 没有找到相关文章

最新更新