我知道关于这个错误消息还有其他几个问题,但似乎没有一个与我目前面临的问题有关。我从JSON文件流(这部分工作):
gamingEventDF = (spark
.readStream
.schema(eventSchema)
.option('streamName','mobilestreaming_demo')
.option("maxFilesPerTrigger", 1)
.json(inputPath)
)
接下来我要使用writeStream将其附加到表中:
def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe.rdd
.spark
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append")
.start(bronzePath)
)
当我现在运行:
writeToBronze(gamingEventDF, outputPathBronze, "bronze_stream")
我得到的错误:AnalysisException:查询流源必须与writeStream.start()
执行
Btw:当我删除。rdd,我得到另一个错误('DataFrame'对象没有属性'spark')
你知道我做错了什么吗?谢谢你。
writeStream方法只能在dataframe类上使用,而不能在SparkSession上使用。
下面的代码应该为你工作。
def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append")
.start(bronzePath)
.awaitTermination())