我正在尝试将foreachBatch
与spark结构化流一起使用。我在spark-shell
控制台上尝试了代码,它工作起来没有任何问题,但当我试图编译代码时,我得到了以下错误。
value foreachBatch不是org.apache.spark.sql.streaming.DataStreamWriter[org.apache.sspark.sql.Row]的成员[error]可能的原因:"value foreachBatch"之前可能缺少分号?[error].foreachBatch{(batchDf:DataFrame,batchId:Long(=>batchDf
我的代码是这样的。
val query = finalStream
.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) => batchDf
.write
.format("com.databricks.spark.redshift")
.option("url", StreamingCfg.redshiftJdbcUrl)
.option("dbtable", redshiftTableName)
.option("aws_iam_role", StreamingCfg.redshiftARN)
.option("tempdir", redshiftTempDir)
.mode(SaveMode.Append)
.save()
batchDf
.write
.mode(SaveMode.Append)
.partitionBy("date_key", "hour")
.parquet(outputLocation);
}
.trigger(Trigger.ProcessingTime(aggregationTime.seconds))
.option("checkpointLocation", checkPointingLocation)
.start()
有人知道我在这里错过了什么吗?
关于我正在做的事情,从卡夫卡读取两个流->让流加入他们的行列->同时将其写入红移和S3。谢谢
试着这样使用它:
finalStream
.writeStream
.foreachBatch( (batchDF: DataFrame, batchId: Long ) => {
})
如果它在sparkshell中工作,您应该仔细检查工作(dev(环境中的依赖关系。确保它能够加载所有的spark依赖项,并且使用正确的版本。