如何将这个简单的 Spark 流代码转换为多线程代码?



我正在学习Scala的Kafka。附加的代码只是使用 Kafka 和 Spark Streaming 的字数统计实现。 如何在流式传输时为每个分区单独执行使用者?请帮忙!

这是我的代码:


class ConsumerM(topics: String, bootstrap_server: String, group_name: String) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
.setMaster("local[*]")
.set("spark.executor.memory","1g")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap_server,
ConsumerConfig.GROUP_ID_CONFIG -> group_name,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
"auto.offset.reset" ->"earliest")

val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}

假设您的输入主题有多个分区,那么另外设置local[*]意味着每个 CPU 内核将有一个 Spark 执行器,并且每个内核至少可以使用一个分区

最新更新