这是我尝试转换它时遇到的异常。
val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我所做的只是在结构化流中复制以下 sql.dataframe 操作。
df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))
它在数据帧中运行良好,但在结构化流中运行不正常。
即使在Spark Core的RDD世界中,collect
也是一个很大的禁忌,因为你可以传输回驱动程序的单个JVM的数据大小。它只是设置了Spark的好处的边界,就像collect
你在单个JVM中一样。
话虽如此,请考虑永远不会终止的无限数据,即数据流。这就是Spark结构化流。
流式处理数据集是一个永远不会完整的数据集,每次您请求内容时,其中的数据都会发生变化,即对数据流执行结构化查询的结果。
您根本不能说"嘿,给我作为流数据集内容的数据"。这甚至没有意义。
这就是无法collect
流式处理数据集的原因。Spark 2.2.1(撰写本文时的最新版本)是不可能的。
如果要在一段时间内(也称为 Spark 流式处理中的批处理间隔或 Spark 结构化流式处理中的触发器)接收流式处理数据集中的数据,请将结果写入流式处理接收器,例如console
.
还可以编写在addBatch
内部执行collect.map(_.toSeq)
的自定义流式处理接收器,这是流式处理接收器的主要且唯一的方法。事实上,console
水槽正是这样做的。
我所要做的就是复制以下sql.data帧 结构化流式处理中的操作。
df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))
它在数据帧中运行良好,但在结构化流中运行不正常。
我想到的第一个解决方案是使用foreach
水槽:
foreach
运算允许对输出数据计算任意运算。
当然,这并不意味着这是最好的解决方案。只是我立即想到的一个。