我需要在kafka应用程序上执行单元测试,避免使用第三方库。
我现在的问题是,我想在测试之间清除所有的主题,但我不知道怎么做。
这是我的临时解决方案:提交每个测试后产生的每个消息,并将所有测试消费者放在同一个消费者组中。
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
这个行不通,我不知道为什么。
例如,当我在测试中创建一个新的消费者时,messages
包含在前一个测试中产生的消息。
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
我该如何解决这个问题?
你也可以在你的测试源中嵌入一个Kafka/Zookeeper实例,以便对这些孤立的服务有更多的控制。
trait Kafka { self: ZooKeeper =>
Kafka.start()
}
object Kafka {
import org.apache.hadoop.fs.FileUtil
import kafka.server.KafkaServer
@volatile private var started = false
lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile
lazy val kafkaServer: KafkaServer = {
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val (host, port) = {
val (h, p) = config.getString("kafka.servers").span(_ != ':')
h -> p.drop(1).toInt
}
val serverConf = new kafka.server.KafkaConfig({
val props = new java.util.Properties()
props.put("port", port.toString)
props.put("broker.id", port.toString)
props.put("log.dir", logDir.getAbsolutePath)
props.put(
"zookeeper.connect",
s"localhost:${config getInt "test.zookeeper.port"}"
)
props
})
new KafkaServer(serverConf)
}
def start(): Unit = if (!started) {
try {
kafkaServer.startup()
started = true
} catch {
case err: Throwable =>
println(s"fails to start Kafka: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) kafkaServer.shutdown()
} finally {
FileUtil.fullyDelete(logDir)
}
}
trait ZooKeeper {
ZooKeeper.start()
}
object ZooKeeper {
import java.nio.file.Files
import java.net.InetSocketAddress
import org.apache.hadoop.fs.FileUtil
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.ServerCnxnFactory
@volatile private var started = false
lazy val logDir = Files.createTempDirectory("zk-log").toFile
lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile
lazy val (zkServer, zkFactory) = {
val srv = new ZooKeeperServer(
snapshotDir, logDir, 500
)
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val port = config.getInt("test.zookeeper.port")
srv -> ServerCnxnFactory.createFactory(
new InetSocketAddress("localhost", port), 1024
)
}
def start(): Unit = if (!zkServer.isRunning) {
try {
zkFactory.startup(zkServer)
started = true
while (!zkServer.isRunning) {
Thread.sleep(500)
}
} catch {
case err: Throwable =>
println(s"fails to start ZooKeeper: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) zkFactory.shutdown()
} finally {
try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable => () }
FileUtil.fullyDelete(snapshotDir)
}
}
测试类可以extends Kafka with ZooKeeper
以确保可用。
如果测试JVM没有分叉,可以使用SBT testOptions in Test
设置中的Tests.Cleanup
来停止测试后的嵌入式服务。
我建议您在测试之前重新创建所有主题。例如,kafka测试创建/删除主题的方式如下:
Kafka repository on GitHub