如何使用scala中的spark流将索引列附加到spark数据帧



我使用的是这样的东西:

df.withColumn("idx", monotonically_increasing_id())

但我得到了一个例外,因为它不受支持:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Expression(s): monotonically_increasing_id() is not supported with streaming DataFrames/Datasets;;
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:143)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)

如何在scala中向spark-streaming数据帧添加索引或行号列?

完整堆叠:https://justpaste.it/5bdqr

在Spark streaming的流媒体计划中,有一些操作不可能存在,不幸的是,包括monotonically_increasing_id()。仔细检查transformed1是否失败,并出现您的问题中的错误,以下是Spark源代码中关于此检查的参考:

import org.apache.spark.sql.functions._ 
val df = Seq(("one", 1), ("two", 2)).toDF("foo", "bar")
val schema = df.schema
df.write.parquet("/tmp/out")
val input = spark.readStream.format("parquet").schema(schema).load("/tmp/out")
val transformed1 = input.withColumn("id", monotonically_increasing_id())
transformed1.writeStream.format("parquet").option("format", "append") .option("path", "/tmp/out2") .option("checkpointLocation", "/tmp/checkpoint_path").outputMode("append").start()
import org.apache.spark.sql.expressions.Window
val windowSpecRowNum = Window.partitionBy("foo").orderBy("foo")
val transformed2 = input.withColumn("row_num", row_number.over(windowSpecRowNum))
transformed2.writeStream.format("parquet").option("format", "append").option("path", "/tmp/out2").option("checkpointLocation", "/tmp/checkpoint_path").outputMode("append").start()

此外,我试图在上面的快照中的DF-transformed2中的一列上添加Window索引-它也失败了,但有一个不同的错误(:

"流媒体不支持非基于时间的窗口数据帧/数据集";

您可以在这里找到所有不受支持的Spark Streaming运算符检查-在Spark Batch中添加索引列的传统方法似乎在Spark Stream中不起作用。

相关内容

  • 没有找到相关文章

最新更新