我在Kotlin使用反应式框架编写了一个KafkaConsumer,问题是整个项目结构都是基于Kotlin协程的,现在kafka consumer遵循Flux发布程序管道
我让它与runBlocking
一起工作,但我知道在我们的项目中使用阻塞代码不是一个好主意
我尝试使用@KafkaListener
(添加挂起修饰符时失败(,
import com.github.avrokotlin.avro4k.Avro
import kotlinx.coroutines.runBlocking
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.CommandLineRunner
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
import org.springframework.stereotype.Component
@Component
class KafkaConsumer(
val slackNotificationService: SlackNotificationService,
val consumerTemplate: ReactiveKafkaConsumerTemplate<String, GenericRecord>
) : CommandLineRunner {
suspend fun sendNotification(record: ConsumerRecord<String, GenericRecord>) {
val tagNotification = Avro.default.fromRecord(TagNotification.serializer(), record.value())
slackNotificationService.notifyUsers(tagNotification)
}
override fun run(vararg args: String?) {
consumerTemplate
.receiveAutoAck()
.subscribe {
runBlocking {
sendNotification(it)
}
}
}
}
我可以成功地接收kafka消息,项目的所有其他部分都运行良好,但我无法在这里创建这个非阻塞桥,
有人知道是否有更好的方法来处理这个问题吗?
提前感谢:(
如果您想异步调用sendNotification()
,那么创建一个CoroutineScope
并用它启动协同程序:
class KafkaConsumer(
...
private val coroutineScope = CoroutineScope(Dispatchers.Default)
...
.subscribe {
coroutineScope.launch {
sendNotification(it)
}
}
如果KafkaConsumer
可能被破坏/关闭,则建议在发生时调用coroutineScope.cancel()
。