如何从HDFS源流式传输时运行多个批处理



我有一个类似val df = spark.readStream.schema(s).parquet ("/path/to/file").where("Foo > 0").groupBy("bar").agg(expr("sum(Foo)"))的数据集。数据集有超过100万条记录,Parquet文件包含1个分区。

我用df.writeStream.outputMode("update").format("console").start启动流。

然后Spark一次处理整个文件。但我希望Spark了解如何在更新结果的同时"拆分"文件并一次处理每个拆分,就像单词计数示例在我输入新词时更新结果一样。

我尝试添加trigger(Trigger.ProcessingTime("x seconds")),但没有成功。

然后Spark一次处理整个文件。但我希望Spark了解如何在更新结果的同时"拆分"文件并一次处理每个拆分,就像单词计数示例在我输入新词时更新结果一样。

这就是Spark结构化流处理文件的方式。它一次处理它们,再也不会考虑它们。它确实将文件"拆分"为多个部分(好吧,这应该在存储中,例如HDFS,而不是Spark本身),但它是在隐蔽的情况下这样做的。

请注意,一旦文件被处理,该文件将不再被处理。

我尝试添加trigger(Trigger.ProcessingTime("x seconds")),但没有成功。

是的,但不是你想要的。

DataStreamWriter.trigger设置流查询的触发器。默认值为ProcessingTime(0),它将尽可能快地运行查询。

请参阅DataStreamWriter的scaladoc。

最新更新