如何在流数据集上执行 df.rdd 或 df.collect().foreach?



这是我尝试转换它时遇到的异常。

val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我所做的只是在结构化流中复制以下 sql.dataframe 操作。

df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

它在数据帧中运行良好,但在结构化流中运行不正常。

即使在Spark Core的RDD世界中,collect也是一个很大的禁忌,因为你可以传输回驱动程序的单个JVM的数据大小。它只是设置了Spark的好处的边界,就像collect你在单个JVM中一样。

话虽如此,请考虑永远不会终止的无限数据,即数据流。这就是Spark结构化流。

流式处理数据集是一个永远不会完整的数据集,每次您请求内容时,其中的数据都会发生变化,即对数据流执行结构化查询的结果。

您根本不能说"嘿,给我作为流数据集内容的数据"。这甚至没有意义。

这就是无法collect流式处理数据集的原因。Spark 2.2.1(撰写本文时的最新版本)是不可能的。

如果要在一段时间内(也称为 Spark 流式处理中的批处理间隔或 Spark 结构化流式处理中的触发器)接收流式处理数据集中的数据,请将结果写入流式处理接收器,例如console.

还可以编写在addBatch内部执行collect.map(_.toSeq)的自定义流式处理接收器,这是流式处理接收器的主要且唯一的方法。事实上,console水槽正是这样做的。

我所要做的就是复制以下sql.data帧 结构化流式处理中的操作。

df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

它在数据帧中运行良好,但在结构化流中运行不正常。

我想到的第一个解决方案是使用foreach水槽:

foreach运算允许对输出数据计算任意运算。

当然,这并不意味着这是最好的解决方案。只是我立即想到的一个。

最新更新