我在 Scala 中有一个 Spark 应用程序,它每 10 秒从 Kafka 获取记录并将它们保存为文件。这是 SBT 项目,我使用sbt run
命令运行我的应用程序。在我在Tomcat上部署我的应用程序之前,一切正常。我设法使用此插件生成 WAR 文件,但看起来我的应用程序在部署在 Tomcat 上时没有任何作用。
这是我的代码:
object SparkConsumer {
def main (args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mytopic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value)).print
val arr = new ArrayBuffer[String]();
val lines = stream.map(record => (record.key, record.value));
stream.foreachRDD { rdd =>
if (rdd.count() > 0 ) {
val date = System.currentTimeMillis()
rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString)
rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) }
}
println("Stream had " + rdd.count() + " messages")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
println(o)
}
}
stream.saveAsTextFiles("/tmp/output")
ssc.start()
ssc.awaitTermination()
}
}
奇怪的是,该应用程序在通过sbt run
命令运行时完全正常工作。它正确地从 Kafka 读取记录,并将它们作为文件保存在所需的目录中。我不知道发生了什么。我尝试使用log4j
启用日志记录,但它在 Tomcat 上甚至没有记录任何内容。我一直在寻找答案,但还没有找到解决方案。
总结一下
我的 Scala Spark 应用程序(这是 SBT 项目)应该从 Kafka 读取记录,并每 10 秒将它们保存为文件。当通过sbt run
命令运行时,它可以工作,但在Tomcat上部署时则不工作。
附加信息:
- 斯卡拉 2.12
- 雄猫 7
- SBT 0.13.15
- 要求更多
问:问题出在哪里?
tl;dr独立应用程序SparkConsumer
在Tomcat上正常运行,Tomcat本身也是如此。
我很惊讶读到这个问题,因为你的代码不是我期望在 Tomcat 上工作的东西。不好意思。
Tomcat 是一个 servlet 容器,因此需要 Web 应用程序中的 servlet。
即使您设法创建了一个 WAR 并将其部署到 Tomcat,您也没有从此 Web 应用程序"触发"任何内容来启动 Spark Streaming 应用程序(main
方法中的代码)。
Spark Streaming应用程序在使用sbt run
执行时确实可以正常工作,因为这是sbt run
的目标,即在SBT管理的项目中执行独立应用程序。
鉴于您的 sbt 项目中只有一个独立的应用程序,sbt run
设法找到了SparkConsumer
并执行了其main
输入方法。这里不足为奇。
然而,它不适用于Tomcat。您必须将应用程序公开为 POST 或 GET 端点,并使用 HTTP 客户端(浏览器或命令行工具,如 curl、wget 或 httpie)来执行它。
Spark不支持Scala 2.12,所以...你是如何设法将Scala版本与Spark一起使用的?!不可能的!