是否可以在无需使用runBlocking进行阻塞的情况下,在反应式代码和kotlin协同程序之间架起桥梁



我在Kotlin使用反应式框架编写了一个KafkaConsumer,问题是整个项目结构都是基于Kotlin协程的,现在kafka consumer遵循Flux发布程序管道

我让它与runBlocking一起工作,但我知道在我们的项目中使用阻塞代码不是一个好主意

我尝试使用@KafkaListener(添加挂起修饰符时失败(,

import com.github.avrokotlin.avro4k.Avro
import kotlinx.coroutines.runBlocking
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.CommandLineRunner
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
import org.springframework.stereotype.Component
@Component
class KafkaConsumer(
val slackNotificationService: SlackNotificationService,
val consumerTemplate: ReactiveKafkaConsumerTemplate<String, GenericRecord>
) : CommandLineRunner {
suspend fun sendNotification(record: ConsumerRecord<String, GenericRecord>) {
val tagNotification = Avro.default.fromRecord(TagNotification.serializer(), record.value())
slackNotificationService.notifyUsers(tagNotification)
}
override fun run(vararg args: String?) {
consumerTemplate
.receiveAutoAck()
.subscribe {
runBlocking {
sendNotification(it)
}
}
}
}

我可以成功地接收kafka消息,项目的所有其他部分都运行良好,但我无法在这里创建这个非阻塞桥,

有人知道是否有更好的方法来处理这个问题吗?

提前感谢:(

如果您想异步调用sendNotification(),那么创建一个CoroutineScope并用它启动协同程序:

class KafkaConsumer(
...
private val coroutineScope = CoroutineScope(Dispatchers.Default)
...
.subscribe {
coroutineScope.launch { 
sendNotification(it)
}
}

如果KafkaConsumer可能被破坏/关闭,则建议在发生时调用coroutineScope.cancel()

最新更新