我们是否可以在火花结构化流批处理模式下从特定偏移量中从 Kafka 获取数据



在 kafka 中,我动态获取新主题,我必须使用来自特定偏移量的火花流来处理它。是否有可能从变量传递 json 值。例如,考虑以下代码

val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.load()

在此,我想动态更新启动偏移的值...我尝试在字符串中传递值并调用它,但它不起作用......如果我在启动偏移量中给出相同的值,它就可以工作。在这种情况下如何使用变量?

val start_offset= """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}"""
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", start_offset)
.load()
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}"""
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("ReadSpecificOffsetFromKafka");
val spark = SparkSession.builder().config(conf).getOrCreate();
spark.sparkContext.setLogLevel("error");
import spark.implicits._;
val start_offset = """{"first_topic" : {"0" : 15, "1": -2, "2": 6}}"""
val fromKafka = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092, localhost:9093")
.option("subscribe", "first_topic")
//      .option("startingOffsets", "earliest")
.option("startingOffsets", start_offset)
.load();
val selectedValues = fromKafka.selectExpr("cast(value as string)", "cast(partition as integer)");
selectedValues.writeStream
.format("console")
.outputMode("append")
//      .trigger(Trigger.Continuous("3 seconds"))
.start()
.awaitTermination();
}

这是使用 Spark 结构化流和 scala 从 kafka 获取特定偏移量的确切代码

  • 看起来你的工作是检查将 Kafka 偏移量指向一些 持久存储。尝试清洁这些。并重新运行作业。
  • 此外,请尝试重命名作业并运行它。

Spark 可以通过readStream读取流。因此,请尝试使用错误消息中显示的偏移量来消除错误。

spark
.readStream
.format("kafka")
.option("subscribePattern", "topic.*")

最新更新