我正在尝试编写简单的测试用例来使用spark结构流。代码的灵感来自于github上的holdenk。
这是CustomSink代码
case class CustomSink(func: DataFrame => Unit)
extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
func(data)
}
}
class CustomSinkProvider extends StreamSinkProvider {
def func(df: DataFrame) {
df.show(5)
}
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): CustomSink = {
new CustomSink(func)
}
}
我尝试在MemoryStream
的测试用例中运行它@Test
def demoCustomSink: Unit = {
val input = MemoryStream[String]
val doubled = input.toDS().map(x => x + " " + x)
// input.addData("init")
val query = doubled.writeStream
.queryName("testCustomSinkBasic")
.format("com.knockdata.spark.highcharts.demo.CustomSinkProvider")
.start()
input.addData("hi")
query.processAllAvailable()
}
没有行input.addData("init")
2016-10-12 03:48:37 ERROR StreamExecution :91 - Query testCustomSinkBasic terminated with error
java.lang.RuntimeException: No data selected!
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at org.apache.spark.sql.execution.streaming.MemoryStream$$anonfun$getBatch$4.apply(memory.scala:110)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:109)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:329)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:120)
init
未到达sink如果添加行input.addData("init")
测试用例可以成功运行,没有报告错误,如果我取消注释行input.addData("init")
。
但是值init
没有到达接收器。
hi hi
值。为什么?如何解决?
后台有检查点机制。如果检查点目录中有一些数据,则会出错。
用下面的代码创建一个帮助器方法来清除目录。
val checkpointPath = Files.createTempDirectory("query")
val checkpointDir = checkpointPath.toFile
checkpointDir.deleteOnExit()
def deleteRecursively(file: java.io.File): Unit = {
if (file.isDirectory) {
file.listFiles().foreach(deleteRecursively)
file.delete()
}
else
file.delete()
}
def clearCheckpointDir: Unit = {
checkpointDir.listFiles().foreach(deleteRecursively)
}
lazy val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.sql.streaming.checkpointLocation",
checkpointDir.getAbsolutePath)
.master("local[*]")
.appName("test")
.getOrCreate()
然后在测试用例中,我添加了以下代码和自定义接收器。
@Before
def before: Unit = {
clearCheckpointDir()
}