我正在尝试用自定义接收器编写一个Spark流应用程序。我应该通过提供具有预定义间隔的随机值来模拟实时输入数据。(简化的)接收器如下所示,根据Spark Streaming应用程序代码如下:
class SparkStreamingReceiver extends Actor with ActorHelper {
private val random = new Random()
override def preStart = {
context.system.scheduler.schedule(500 milliseconds, 1000 milliseconds)({
self ! ("string", random.nextGaussian())
})
}
override def receive = {
case data: (String, Double) => {
store[(String, Double)](data)
}
}
}
val conf: SparkConf = new SparkConf()
conf.setAppName("Spark Streaming App")
.setMaster("local")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
val randomValues: ReceiverInputDStream[(String, Double)] =
ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()), "Receiver")
randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues")
运行此代码,我看到接收器正在工作(存储项目,接收到单个日志条目)。但是,saveAsTextFiles
永远不会输出值。
我可以通过将master更改为使用两个线程(local[2]
)来解决这个问题,但如果我注册了接收器的另一个实例(我打算这样做),它就会重新出现。更具体地说,我需要注册至少一个比我的自定义接收器数量多的线程才能获得任何输出。
在我看来,工作线程好像被接收器挂起了。
有人能解释一下这种影响吗,以及可能如何修复我的代码吗?
每个接收器都使用一个计算槽。因此,2个接收机将需要2个计算时隙。如果所有的计算时隙都被接收机占用,那么就没有剩余的时隙来处理数据。这就是为什么具有1个接收器的"本地"模式和具有2个接收器的"本地[2]"会暂停处理。