如何将kafka的单个主题流式传输,按键过滤到hdfs的多个位置?



我不是流我的数据在多个hdfs的位置,这是按键过滤。所以下面的代码不工作。请帮我找出正确的方法来写这段代码

val ER_stream_V1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
.option("subscribe", "Topic1")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
val ER_stream_V2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
.option("subscribe", "Topic1")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
ER_stream_V1.toDF()
.select(col("key"), col("value").cast("string"))
.filter(col("key")==="Value1")
.select(functions.from_json(col("value").cast("string"), Value1Schema.schemaExecution).as("value")).select("value.*")
.writeStream
.format("orc")
.option("metastoreUri", configManager.getString("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/tmp/teststreaming/execution/checkpoint2005")
.option("path", "/tmp/test/value1")
.trigger(Trigger.ProcessingTime("5 Seconds"))
.partitionBy("jobid")
.start()
ER_stream_V2.toDF()
.select(col("key"), col("value").cast("string"))
.filter(col("key")==="Value2")
.select(functions.from_json(col("value").cast("string"), Value2Schema.schemaJobParameters).as("value"))
.select("value.*")
.writeStream
.format("orc")
.option("metastoreUri", configManager.getString("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/tmp/teststreaming/jobparameters/checkpoint2006")
.option("path", "/tmp/test/value2")
.trigger(Trigger.ProcessingTime("5 Seconds"))
.partitionBy("jobid")
.start()

您不应该需要两个阅读器。创建一个并筛选两次。您可能还希望将startingOffsets视为earliest以读取现有的主题数据

例如。

val ER_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", configManager.getString("Kafka.Server"))
.option("subscribe", "Topic1")
.option("startingOffsets", "latest")  // maybe change?
.option("failOnDataLoss", "false")
.load()
.toDF()
.select(col("key").cast("string").as("key"), col("value").cast("string"))
val value1Stream = ER_stream
.filter(col("key") === "Value1")
.select(functions.from_json(col("value"), Value1Schema.schemaExecution).as("value"))
.select("value.*")
val value2Stream = ER_stream
.filter(col("key") === "Value2")
.select(functions.from_json(col("value"), Value2Schema.schemaJobParameters).as("value"))
.select("value.*")
value1Stream.writeStream.format("orc")
...
.start()
value2Stream.writeStream.format("orc")
...
.start()

相关内容

  • 没有找到相关文章

最新更新