为什么我的Spark流媒体应用程序未打印来自Kafka(使用计数运算符)的记录数量



我正在处理Spark应用程序,该应用程序需要读取Kafka的数据。我创建了一个Kafka主题,生产者正在发布消息。我从控制台消费者中验证了消息已成功发布。

我写了一个简短的Spark应用程序来读取Kafka的数据,但没有获取任何数据。以下是我使用的代码:

def main(args: Array[String]): Unit = {
   val Array(zkQuorum, group, topics, numThreads) = args
   val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]")
   val ssc = new StreamingContext(sparkConf, Seconds(2))
   val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
   val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
   process(lines) // prints the number of records in Kafka topic
   ssc.start()
   ssc.awaitTermination()
 }
 private def process(lines: DStream[String]) { 
   val z = lines.count()
   println("count of lines is "+z) 
    //edit
   lines.foreachRDD(rdd => rdd.map(println) 
   // <-- Why does this **not** print?
 )

关于如何解决此问题的任何建议?

******编辑****

我已经使用过

lines.foreachRDD(rdd => rdd.map(println)

在实际代码中也是不起作用的。我设置了帖子中提到的保留期:kafka spark DirectStream无法获取数据。但是仍然存在问题。

您的 process> no 输出操作员的DStream管道的延续

您可以通过阅读count操作员的签名来"看到"它:

count(): DStream[Long]

引用count的Scaladoc:

返回一个新的Dstream,每个RDD都通过计算此Dstream的每个RDD来生成一个单个元素。

因此,您有一个kafka记录的Dstream,您将转换为单个值的Dstream(是count的结果)。输出不多(到控制台或任何其他水槽)。

您必须使用官方文档输出操作中所述的输出运算符结束管道:

输出操作允许将DStream的数据推到数据库或文件系统等外部系统。由于输出操作实际上允许转换的数据被外部系统消耗,因此它们触发了所有Dstream转换的实际执行(类似于RDD的操作)。

(低级别)输出操作员寄存器输入Dstreams作为输出Dstreams,以便执行可以启动。Spark Streaming的DStream设计没有成为输出Dstream的概念。知道并能够区分输入和输出Dstream是DStreamGraph

最新更新