Databricks:带有流源的查询必须使用writeStream.start()执行



我知道关于这个错误消息还有其他几个问题,但似乎没有一个与我目前面临的问题有关。我从JSON文件流(这部分工作):

gamingEventDF = (spark
.readStream
.schema(eventSchema) 
.option('streamName','mobilestreaming_demo') 
.option("maxFilesPerTrigger", 1)
.json(inputPath) 
)

接下来我要使用writeStream将其附加到表中:

def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe.rdd
.spark
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append") 
.start(bronzePath)
)

当我现在运行:

writeToBronze(gamingEventDF, outputPathBronze, "bronze_stream")

我得到的错误:AnalysisException:查询流源必须与writeStream.start()执行

Btw:当我删除。rdd,我得到另一个错误('DataFrame'对象没有属性'spark')

你知道我做错了什么吗?谢谢你。

writeStream方法只能在dataframe类上使用,而不能在SparkSession上使用。

下面的代码应该为你工作。

def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append") 
.start(bronzePath)
.awaitTermination())

相关内容

最新更新