如何在读取多分区 kafka 主题时激发结构化流使用者的启动和调用



如果一个kakfa主题有多个分区,在java中,这些许多消费者实例/线程将在消费者端实例化。

如何在火花流消费者方面处理它?我没有找到太多关于相同的信息。同一的任何示例,即在一个主题的火花流消费者处调用多个使用者。

周围的任何设计建议/示例都将不胜感激。

问候希亚姆

如果 Kafka 有多个分区,这意味着消费者可以通过并行执行某个任务从中受益。特别是内部的 Spark 流可以通过增加 num-executors 参数来加快作业速度。这与 Kafka 拥有的分区数量有关,例如,如果你的 Kafka 分区数量与 Spark 中的 num-executors 相同,理论上所有执行器都可以一次读取所有分区,这显然增加了系统吞吐量。

Spark 流式处理始终从 Kafka 中的所有可用分区并行读取数据,只要 Spark 有足够的资源。 这是Spark开箱即用的,我们不需要为此编写任何代码。

例如,如果您的 Kafka 主题有 4 个分区,那么如果您启动 具有 2 个执行程序的 Spark 作业,每个执行程序有 2 个内核,然后是您的 Spark 作业 将启动 4 个任务以从 4 个 Kafka 并行读取数据 分区。

如果您需要更多信息,请随时发表评论。

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

import java.sql.Timestamp
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import java.time.{LocalDate, LocalDateTime}
import java.util.Calendar

object SparkKafka {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("test_app")
      .getOrCreate()
    val sparkContext = spark.sparkContext

    val ssc = new StreamingContext(sparkContext, Seconds(1)) // the polling frequency is 2 seconds, can be modified based on the BM requirements.
///val currentHour = now.get(Calendar.HOUR_OF_DAY)
    log.info("Before starting the Stream -->>")
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String]
      (Array.apply("Kafka_topic_name"), getKafkaParams()))
      .map(record => record.value)
    stream.foreachRDD { rdd =>
      try {
        if (!rdd.isEmpty()) {
          log.info("rdd is not empty and saving to -->>"+LocalDate.now.getYear+"/"+LocalDate.now.getMonth+"/"+LocalDate.now.getDayOfMonth+"/"+LocalDateTime.now().getHour)
          rdd.saveAsTextFile("hdfs:///<folder to save>") //TODO::: Externalize the HDFS location to Props


          LocalDate.now.getMonth

         if (null != args && null != args {
            0
          } && args {
            0
          }.equals("log")) {
            rdd.foreach(x => print("Message read and saved TO S3 bucket----*****--->>" + x))
          }
        }
      } catch {
        case t: Throwable =>
          t.printStackTrace() // TODO: handle error)
          log.error("Exception occured while processing the data exception is {}", t.getCause)
      }
    }
    ssc.start()
    log.info("started now-->> " + compat.Platform.currentTime)
    ssc.awaitTermination()
  }
  def getKafkaParams(): Map[String, Object] = {
    Map[String, Object]("bootstrap.servers" -> "host:port
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "Group_Name",
      //      "sasl.kerberos.service.name" -> "kafka",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean))
  }
}

最新更新