如何在AMQ主题中没有数据可读时停止流



我正在使用火花流从AMQ读取。我希望流停止时,没有数据留在消息队列中。我创建了一个连接到AMQ主题并开始读取数据的自定义接收器,但是工作人员如何告诉驱动程序没有数据,以便它可以停止流。

class CustomReceiver(brokerURL, topic, ...){
def onStart() {
new Thread("AMQ Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {}
private def receive() {
activeMQStream = new ActiveMQStream(broker, topic, ...)
val topicSubscriber = activeMQStream.getTopicSubscriber()
while(!isStopped && !ActiveMQReceiver.stop){
val message = topicSubscriber.receive(timeOutInMilliseconds)
if (message != null && message.isInstanceOf[TextMessage]) {
val textMessage = message.asInstanceOf[TextMessage];
val text = textMessage.getText();
store(text)
println("ActiveMQReceiver: there is data from AMQ ....")
} else {
ActiveMQReceiver.stop = true
println("ActiveMQReceiver: No more data from AMQ .....")
}
}
def checkStatus(): Boolean ={
ActiveMQReceiver.stop
}
}
object ActiveMQReceiver{
@volatile var stop: Boolean = false
}

正如你在上面看到的,当没有数据可读时,我试图将停止标志设置为true,但是当我运行以下命令时,标志总是False,在搜索后,我发现工作人员不共享变量。我试着用Accumulator替换它,但那也不起作用。

var ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val customReceiver = new CustomReceiver(brokerURL, topic, ...)
val stream: DStream[String] = ssc.receiverStream(customReceiver)
var driverList = List[String]()
stream.foreachRDD { rdd =>
if(rdd.count() > 0){
val fromWorker = rdd.collect().toList
driverList = driverList:::fromWorker
}
} 
var stopFlag = false
var isStopped = false
val checkIntervalMillis = 10000
while (!isStopped) {
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
println("Check if stop flag was raised")
stopFlag = customReceiver.checkStatus()
if (!isStopped && stopFlag) {
var seq = driverList.toSeq
import spark.implicits._
val df = seq.toDS()
println("Request to stop")
ssc.stop(false, true)
}
}

依赖于receive()返回null来表示没有剩余数据,这在生产中将是危险的。这种方法消除了任何自我修复和故障转移支持,并引入了一个定时/竞争条件,你可能会"不走运"。作为一种替代方法,请考虑使用Message Groups,并将流中最后一条消息的标头设置为使用定义良好的消息来发出信号。

消息组

最新更新