为什么Spark Streaming应用程序使用sbt运行可以正常工作,但在Tomcat(作为Web应用程序)上却不能?



我在 Scala 中有一个 Spark 应用程序,它每 10 秒从 Kafka 获取记录并将它们保存为文件。这是 SBT 项目,我使用sbt run命令运行我的应用程序。在我在Tomcat上部署我的应用程序之前,一切正常。我设法使用此插件生成 WAR 文件,但看起来我的应用程序在部署在 Tomcat 上时没有任何作用。
这是我的代码:

object SparkConsumer {
def main (args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(10))

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mytopic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value)).print
val arr = new ArrayBuffer[String]();
val lines = stream.map(record => (record.key, record.value));
stream.foreachRDD { rdd =>
if (rdd.count() > 0 ) {
val date = System.currentTimeMillis()
rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString)
rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) }
}
println("Stream had " + rdd.count() + " messages")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
println(o)
}
}
stream.saveAsTextFiles("/tmp/output")

ssc.start()
ssc.awaitTermination()
}
}

奇怪的是,该应用程序在通过sbt run命令运行时完全正常工作。它正确地从 Kafka 读取记录,并将它们作为文件保存在所需的目录中。我不知道发生了什么。我尝试使用log4j启用日志记录,但它在 Tomcat 上甚至没有记录任何内容。我一直在寻找答案,但还没有找到解决方案。

总结一下

我的 Scala Spark 应用程序(这是 SBT 项目)应该从 Kafka 读取记录,并每 10 秒将它们保存为文件。当通过sbt run命令运行时,它可以工作,但在Tomcat上部署时则不工作。

附加信息:

  • 斯卡拉 2.12
  • 雄猫 7
  • SBT 0.13.15
  • 要求更多

问:问题出在哪里?

tl;dr独立应用程序SparkConsumer在Tomcat上正常运行,Tomcat本身也是如此。

我很惊讶读到这个问题,因为你的代码不是我期望在 Tomcat 上工作的东西。不好意思。

Tomcat 是一个 servlet 容器,因此需要 Web 应用程序中的 servlet。

即使您设法创建了一个 WAR 并将其部署到 Tomcat,您也没有从此 Web 应用程序"触发"任何内容来启动 Spark Streaming 应用程序(main方法中的代码)。

Spark Streaming应用程序在使用sbt run执行时确实可以正常工作,因为这是sbt run的目标,即在SBT管理的项目中执行独立应用程序。

鉴于您的 sbt 项目中只有一个独立的应用程序,sbt run设法找到了SparkConsumer并执行了其main输入方法。这里不足为奇。

然而,它不适用于Tomcat。您必须将应用程序公开为 POST 或 GET 端点,并使用 HTTP 客户端(浏览器或命令行工具,如 curl、wget 或 httpie)来执行它。

Spark不支持Scala 2.12,所以...你是如何设法将Scala版本与Spark一起使用的?!不可能的!

相关内容

最新更新