Spark 流式处理:>写入记录时引发异常:批处理分配事件



我使用以下代码关闭了Spark StreamingContext。

本质上,线程监视布尔开关,然后调用 StreamingContext.stop(true,true)

一切似乎都在处理,我的所有数据似乎都被收集了。但是,我在关机时出现以下异常。

我可以忽略吗?看起来有数据丢失的可能性。

18/03/07 11:46:40 警告 收到块跟踪器:引发异常时 写入记录:批处理分配事件(1520452000000 ms,AlatedBlocks(Map(0 -> ArrayBuffer()))) to the WriteAheadLog. java.lang.IllegalStateException: close() 被调用 批处理提前写入请求前的日志与时间1520452000001 可以实现。 at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:86) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)

线程

var stopScc=false
private def stopSccThread(): Unit = {
val thread = new Thread {
override def run {
var continueRun=true
while (continueRun) {
logger.debug("Checking status")
if (stopScc == true) {
getSparkStreamingContext(fieldVariables).stop(true, true)
logger.info("Called Stop on Streaming Context")
continueRun=false

}
Thread.sleep(50)
}
}
}
thread.start
}

溪流

@throws(classOf[IKodaMLException])
def startStream(ip: String, port: Int): Unit = {
try {
val ssc = getSparkStreamingContext(fieldVariables)
ssc.checkpoint("./ikoda/cp")
val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
lines.print

val lmap = lines.map {
l =>
if (l.contains("IKODA_END_STREAM")) {
stopScc = true
}
l
}

lmap.foreachRDD {
r =>
if (r.count() > 0) {
logger.info(s"RECEIVED: ${r.toString()} first: ${r.first().toString}")
r.saveAsTextFile("./ikoda/test/test")
}
else {
logger.info("Empty RDD. No data received")
}
}
ssc.start()
ssc.awaitTermination()
}
catch {
case e: Exception =>
logger.error(e.getMessage, e)
throw new IKodaMLException(e.getMessage, e)
}

我遇到了同样的问题,调用 close() 而不是停止修复了它。