Spark shell不允许查询结构化流



我正在阅读这本书Spark the Definitive Guide以下代码使用Spark shell 在本地执行

程序:在没有任何其他选项的情况下启动火花外壳

val static = spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val activityCounts = streaming.groupBy("gt").count()
val activityQuery  = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
activityQuery.awaitTermination()

书上说

执行此代码后,流计算将在后台中开始

现在这个流正在运行,我们可以通过查询来试验结果

我的观察结果

当执行此代码时,它不会释放shell以供我键入命令如

`spark.streams.active`

因此,我无法查询此流

我的重新搜索

我试图打开一个新的spark shell,但在该shell中查询不会返回任何结果。从这个shell获得的流是否可以从该shell的其他实例访问。

第1版:

我想把表放在内存中,这样我就可以使用命令进行查询

for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}

两点:

1( 确保为Spark shell 分配了足够数量的Core

运行流媒体/结构化流媒体应用程序需要至少2个核心,以防止出现饥饿场景,即当流媒体应用启动时,1个核心将分配给接收器;如果您启动只有1个核心的spark应用程序,则将没有核心可用于处理执行器收到的消息。

检查火花壳中的芯数:

spark.conf.get("spark.master")

发射4核火花弹

spark-shell --master local[4]

2( 您正在将流写入Memory,这将不会在控制台中显示输出,要显示您必须注册表,然后进行查询。

相反,您可以将格式从内存更改为控制台以查看控制台中的内容。

val activityQuery  = activityCounts.writeStream.queryName("activity_counts").format("console").outputMode("complete").start()

我知道答案真的很晚了,你现在已经找到了,以防别人现在问这个问题:

您需要使用spark.ui.showConsoleProgress=false启动spark shell

spark-shell --conf spark.ui.showConsoleProgress=false

正如Lakshman Battini 所提到的

将格式从内存更改为控制台以查看控制台

要了解更多信息,您可以看到以下问题:

如何在火花中抑制输出控制台的"Stage 2===>"?

普伦对此给出了完美的答案。

有关控制台进度条的更多信息,请访问此链接

在Spark shell中,只需避开activityQuery.awaitTermination()。流已使用.start()启动。

您还可以通过activityQuery.stop()停止流

最新更新