spark Databricks中出现java.util.concurrent.TimeoutException错误



Im使用以下自定义接收器在Spark Scala中使用Rabbitmq中的数据。下面是我的代码。

def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
var userInput: String = null
try {

var batchInterval = Seconds(20)
var ssc = new StreamingContext(sc, batchInterval)
val host = ""
val port = ""
val queueName = ""
val vHost = ""
val userName = ""
val password = ""
val maxMessagesPerPartition = "1000"
val maxReceiveTime = "0.9"

val receiverStream = RabbitMQUtils.createStream(ssc, Map(
"host" -> "host",
"port" -> "port",
"queueName" -> "queueName",
"vHost" -> "vHost",
"userName" -> "userName",
"password" -> "password", 
"maxMessagesPerPartition" -> "maxMessagesPerPartition",
"maxReceiveTime"   -> "maxReceiveTime"
))

val lines = ssc.receiverStream(new CustomReceiver(host, port.toInt))
lines.foreachRDD(rdd =>{ val df=rdd.toDF


import sqlContext.implicits._

df.write.format("parquet").mode("append").save("path")

})
lines.print()
ssc.start() 
ssc.awaitTermination()

我收到以下超时错误。

java.util.concurrent.TimeoutException
at org.apache.spark.util.ThreadUtils$.runInNewThreadWithTimeout(ThreadUtils.scala:351)
at org.apache.spark.util.ThreadUtils$.runInNewThread(ThreadUtils.scala:283)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:577)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:87)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:188)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw$$iw.<init>(command-1212830188116081:190)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw$$iw.<init>(command-1212830188116081:192)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw$$iw.<init>(command-1212830188116081:194)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$$iw.<init>(command-1212830188116081:196)
at lined28d5369d60244b0a66d1d87a30c93a027.$read.<init>(command-1212830188116081:198)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<init>(command-1212830188116081:202)
at lined28d5369d60244b0a66d1d87a30c93a027.$read$.<clinit>(command-1212830188116081)
at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print$lzycompute(<notebook>:7)
at lined28d5369d60244b0a66d1d87a30c93a027.$eval$.$print(<notebook>:6)
at lined28d5369d60244b0a66d1d87a30c93a027.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)


此错误是否意味着当前的火花群集配置无法处理传入的消息负载。它与内存问题有关吗。有人能帮忙吗。

我建议将spark.driver.memory增加到更高的值。

同时尝试增加broadcastTimeout

请参阅T.GawÉda 的回答

相关内容

最新更新