Spark结构化流内存流报告使用自定义Sink时没有选择数据



我正在尝试编写简单的测试用例来使用spark结构流。代码的灵感来自于github上的holdenk。

这是CustomSink代码

case class CustomSink(func: DataFrame => Unit)
  extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    func(data)
  }
}
class CustomSinkProvider extends StreamSinkProvider {
  def func(df: DataFrame) {
    df.show(5)
  }
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): CustomSink = {
    new CustomSink(func)
  }
}

我尝试在MemoryStream

的测试用例中运行它
@Test
def demoCustomSink: Unit = {
  val input = MemoryStream[String]
  val doubled = input.toDS().map(x => x + " " + x)
  // input.addData("init")
  val query = doubled.writeStream
    .queryName("testCustomSinkBasic")
    .format("com.knockdata.spark.highcharts.demo.CustomSinkProvider")
    .start()
  input.addData("hi")
  query.processAllAvailable()
}

没有行input.addData("init")

报错
2016-10-12 03:48:37 ERROR StreamExecution       :91 - Query testCustomSinkBasic terminated with error
java.lang.RuntimeException: No data selected!
  at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:109)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:329)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:120)

init未到达sink如果添加行input.addData("init")

测试用例可以成功运行,没有报告错误,如果我取消注释行input.addData("init")

但是值init没有到达接收器。

只显示hi hi值。

为什么?如何解决?

后台有检查点机制。如果检查点目录中有一些数据,则会出错。

用下面的代码创建一个帮助器方法来清除目录。

val checkpointPath = Files.createTempDirectory("query")
val checkpointDir = checkpointPath.toFile
checkpointDir.deleteOnExit()
def deleteRecursively(file: java.io.File): Unit = {
  if (file.isDirectory) {
    file.listFiles().foreach(deleteRecursively)
    file.delete()
  }
  else
    file.delete()
}
def clearCheckpointDir: Unit = {
  checkpointDir.listFiles().foreach(deleteRecursively)
}
lazy val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.streaming.checkpointLocation",
    checkpointDir.getAbsolutePath)
  .master("local[*]")
  .appName("test")
  .getOrCreate()

然后在测试用例中,我添加了以下代码和自定义接收器。

@Before
def before: Unit = {
  clearCheckpointDir()
}

相关内容

最新更新