编译错误foreachBatch不是DataStreamWriter的成员,即使它在spark shell上工作



我正在尝试将foreachBatch与spark结构化流一起使用。我在spark-shell控制台上尝试了代码,它工作起来没有任何问题,但当我试图编译代码时,我得到了以下错误。

value foreachBatch不是org.apache.spark.sql.streaming.DataStreamWriter[org.apache.sspark.sql.Row]的成员[error]可能的原因:"value foreachBatch"之前可能缺少分号?[error].foreachBatch{(batchDf:DataFrame,batchId:Long(=>batchDf

我的代码是这样的。

val query = finalStream
.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
.write
.format("com.databricks.spark.redshift")
.option("url", StreamingCfg.redshiftJdbcUrl)
.option("dbtable", redshiftTableName)
.option("aws_iam_role", StreamingCfg.redshiftARN)
.option("tempdir", redshiftTempDir)
.mode(SaveMode.Append)
.save()
batchDf
.write
.mode(SaveMode.Append)
.partitionBy("date_key", "hour")
.parquet(outputLocation);
}
.trigger(Trigger.ProcessingTime(aggregationTime.seconds))
.option("checkpointLocation", checkPointingLocation)
.start()

有人知道我在这里错过了什么吗?

关于我正在做的事情,从卡夫卡读取两个流->让流加入他们的行列->同时将其写入红移和S3。谢谢

试着这样使用它:

finalStream
.writeStream
.foreachBatch( (batchDF: DataFrame, batchId: Long ) => {

})

如果它在sparkshell中工作,您应该仔细检查工作(dev(环境中的依赖关系。确保它能够加载所有的spark依赖项,并且使用正确的版本。

最新更新