使用带水印的追加输出模式时出现结构化流式处理异常



尽管我正在使用withWatermark(),但在运行火花作业时收到以下错误消息:

线程"main"中的异常 org.apache.spark.sql.AnalysisException:当流数据帧/数据集上存在没有水印的流聚合时,不支持追加输出模式;

从我在编程指南中看到的内容来看,这与预期的用法(和示例代码(完全匹配。有谁知道可能出了什么问题?

提前感谢!

相关代码(Java 8,Spark 2.2.0(:

StructType logSchema = new StructType()
.add("timestamp", TimestampType)
.add("key", IntegerType)
.add("val", IntegerType);
Dataset<Row> kafka = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
Dataset<Row> parsed = kafka
.select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
.select("parsed_value.*");
Dataset<Row> tenSecondCounts = parsed
.withWatermark("timestamp", "10 minutes")
.groupBy(
parsed.col("key"),
window(parsed.col("timestamp"), "1 day"))
.count();
StreamingQuery query = tenSecondCounts
.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.option("truncate", false)
.start();

问题出在parsed.col中。将其替换为col将解决此问题。我建议始终使用col函数而不是Dataset.col

Dataset.col返回resolved column,而col返回unresolved column

parsed.withWatermark("timestamp", "10 minutes")将创建一个具有相同名称的新列的新数据集。水印信息附加到新数据集中的timestamp列,而不是parsed.col("timestamp"),因此groupBy中的列没有水印。

当您使用未解析的列时,Spark 将为您找出正确的列。

最新更新