我正在阅读这本书Spark the Definitive Guide以下代码使用Spark shell 在本地执行
程序:在没有任何其他选项的情况下启动火花外壳
val static = spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val activityCounts = streaming.groupBy("gt").count()
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
activityQuery.awaitTermination()
书上说
执行此代码后,流计算将在后台中开始
现在这个流正在运行,我们可以通过查询来试验结果
我的观察结果:
当执行此代码时,它不会释放shell以供我键入命令如
`spark.streams.active`
因此,我无法查询此流
我的重新搜索
我试图打开一个新的spark shell,但在该shell中查询不会返回任何结果。从这个shell获得的流是否可以从该shell的其他实例访问。
第1版:
我想把表放在内存中,这样我就可以使用命令进行查询
for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}
两点:
1( 确保为Spark shell 分配了足够数量的Core
运行流媒体/结构化流媒体应用程序需要至少2个核心,以防止出现饥饿场景,即当流媒体应用启动时,1个核心将分配给接收器;如果您启动只有1个核心的spark应用程序,则将没有核心可用于处理执行器收到的消息。
检查火花壳中的芯数:
spark.conf.get("spark.master")
发射4核火花弹
spark-shell --master local[4]
2( 您正在将流写入Memory,这将不会在控制台中显示输出,要显示您必须注册表,然后进行查询。
相反,您可以将格式从内存更改为控制台以查看控制台中的内容。
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("console").outputMode("complete").start()
我知道答案真的很晚了,你现在已经找到了,以防别人现在问这个问题:
您需要使用spark.ui.showConsoleProgress=false启动spark shell
spark-shell --conf spark.ui.showConsoleProgress=false
正如Lakshman Battini 所提到的
将格式从内存更改为控制台以查看控制台
要了解更多信息,您可以看到以下问题:
如何在火花中抑制输出控制台的"Stage 2===>"?
普伦对此给出了完美的答案。
有关控制台进度条的更多信息,请访问此链接
在Spark shell中,只需避开activityQuery.awaitTermination()
。流已使用.start()
启动。
您还可以通过activityQuery.stop()
停止流