我正在处理Spark应用程序,该应用程序需要读取Kafka的数据。我创建了一个Kafka主题,生产者正在发布消息。我从控制台消费者中验证了消息已成功发布。
我写了一个简短的Spark应用程序来读取Kafka的数据,但没有获取任何数据。以下是我使用的代码:
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
process(lines) // prints the number of records in Kafka topic
ssc.start()
ssc.awaitTermination()
}
private def process(lines: DStream[String]) {
val z = lines.count()
println("count of lines is "+z)
//edit
lines.foreachRDD(rdd => rdd.map(println)
// <-- Why does this **not** print?
)
关于如何解决此问题的任何建议?
******编辑****
我已经使用过
lines.foreachRDD(rdd => rdd.map(println)
在实际代码中也是不起作用的。我设置了帖子中提到的保留期:kafka spark DirectStream无法获取数据。但是仍然存在问题。
您的 process
是> no 输出操作员的DStream
管道的延续
您可以通过阅读count
操作员的签名来"看到"它:
count(): DStream[Long]
引用count
的Scaladoc:
返回一个新的Dstream,每个RDD都通过计算此Dstream的每个RDD来生成一个单个元素。
因此,您有一个kafka记录的Dstream,您将转换为单个值的Dstream(是count
的结果)。输出不多(到控制台或任何其他水槽)。
您必须使用官方文档输出操作中所述的输出运算符结束管道:
输出操作允许将DStream的数据推到数据库或文件系统等外部系统。由于输出操作实际上允许转换的数据被外部系统消耗,因此它们触发了所有Dstream转换的实际执行(类似于RDD的操作)。
(低级别)输出操作员寄存器输入Dstreams作为输出Dstreams,以便执行可以启动。Spark Streaming的DStream
设计没有成为输出Dstream的概念。知道并能够区分输入和输出Dstream是DStreamGraph
。