有没有办法配置textFileStream
源,以便无论文件创建时间如何,它都会处理添加到源目录的任何文件?
为了演示此问题,我创建了一个基本的 Spark 流式处理应用程序,该应用程序使用 textFileStream
作为源并将流内容打印到控制台。将运行应用程序之前创建的现有文件复制到源目录时,不会将任何内容打印到控制台。将应用程序开始运行后创建的文件复制到源目录时,将打印文件内容。以下是我的代码供参考。
val conf = new SparkConf().setAppName("Streaming Test")
.setMaster("local[*]")
val spark = new SparkContext(conf)
val ssc = new StreamingContext(spark, Seconds(5))
val fileStream = ssc.textFileStream("/stream-source")
val streamContents = fileStream.flatMap(_.split(" "))
streamContents.print()
这是记录在案的 FileInputDStream 行为。
如果我们想使用该目录中的现有文件,我们可以使用 Spark API 加载这些文件并将我们想要的逻辑应用于它们。
val existingFiles = sparkContext.textFile(path)
或
val existingFilesDS = sparkSession.read.text(path)
然后,设置并启动流式处理逻辑。我们甚至可以在处理新文件时使用现有文件的数据。