如何执行kafka消费者应用程序的功能测试



我想测试一个使用Kafka消息并将其写入日志的应用程序。以下是它在类似标量的伪代码中的近似表示:

import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import java.util.Properties
import java.util.HashMap
object ConsumerApp extends App {
val topic = new HashMap[String, Integer]()
topic.put("test", 1)
val logger = LoggerFactory.getLogger(getClass().getName())
val messageStream = Consumer
.createJavaConsumerConnector(new ConsumerConfig(new Properties()))
.createMessageStreams(topic)
.get(topic).get(0)
for (message <- messageStream) {
val gotMessage = new String(message.message())
logger.info(gotMessage)
}
}

我脑海中的测试场景如下:

  • Kafka服务器启动。

  • 应用程序启动并连接到Kafka服务器,开始侦听特定主题的消息。

  • 将向主题发送一条消息。

  • 应用程序消耗消息并将其记录下来。

以下是类似标量的伪代码中的测试草案:

import uk.org.lidalia.slf4jtest.TestLoggerFactory;
import uk.org.lidalia.slf4jtest.LoggingEvent.info;
abstract class UnitSpec extends FlatSpec with Matchers with EmbeddedKafka {
}
class ConsumerAppSpec extends UnitSpec {
"ConsumerApp" should "consume and log messages from Kafka on specific topic" in {
withRunningKafka {
val consumer = ConsumerApp
// interecept logger to be able to test that the kafka message is logged
val logger = TestLoggerFactory.getTestLogger(consumer.getClass)
// start the application, but beforehand do something to prevent it infinitely blocking
???
consumer.main(Array())  
// publish a test message    
publishStringMessageToKafka("test", "TEST")
// Confirm that the message has been properly logged
???
}
EmbeddedKafka.stop()
}
}

我的问题是测试代码的前三个问号。如果我执行main((方法,它不会终止,从而阻止执行测试的剩余部分。

让它发挥作用。。。

object ConsumerApp extends App {
def doStuff() {
val topic = ...
// more stuff ...
}
doStuff()
}
class ConsumerAppSpec {
// ...
consumer.doStuff()
// ...
publishMessage() 
}

更新

我想,我误解了这个问题。与其说"不要调用main",我应该说不要阻塞线程:(

class ConsumerAppSpec {
...
val futureResult = Future(consumer.doStuff)
...
publishMessage()
}

最新更新