我有一个类似val df = spark.readStream.schema(s).parquet ("/path/to/file").where("Foo > 0").groupBy("bar").agg(expr("sum(Foo)"))
的数据集。数据集有超过100万条记录,Parquet文件包含1个分区。
我用df.writeStream.outputMode("update").format("console").start
启动流。
然后Spark一次处理整个文件。但我希望Spark了解如何在更新结果的同时"拆分"文件并一次处理每个拆分,就像单词计数示例在我输入新词时更新结果一样。
我尝试添加trigger(Trigger.ProcessingTime("x seconds"))
,但没有成功。
然后Spark一次处理整个文件。但我希望Spark了解如何在更新结果的同时"拆分"文件并一次处理每个拆分,就像单词计数示例在我输入新词时更新结果一样。
这就是Spark结构化流处理文件的方式。它一次处理它们,再也不会考虑它们。它确实将文件"拆分"为多个部分(好吧,这应该在存储中,例如HDFS,而不是Spark本身),但它是在隐蔽的情况下这样做的。
请注意,一旦文件被处理,该文件将不再被处理。
我尝试添加
trigger(Trigger.ProcessingTime("x seconds"))
,但没有成功。
是的,但不是你想要的。
DataStreamWriter.trigger设置流查询的触发器。默认值为ProcessingTime(0),它将尽可能快地运行查询。
请参阅DataStreamWriter的scaladoc。