我想做的是:在插件中读取调用正文,如果正文包含特定内容,请停止对呼叫的进一步处理并立即应答。
这是我的代码,它执行我需要它执行的操作,但它也会在引擎盖下生成java.util.concurrent.CancellationException: Parent job is Cancelling
。据我了解,在这种情况下,我需要停止"正常"的管道流,但是该怎么做呢?
@KtorDsl
class TestPluginConfig
val TestPlugin: RouteScopedPlugin<TestPluginConfig> = createRouteScopedPlugin(
"TestPlugin",
::TestPluginConfig
) {
on(ReceiveBytes) { call, body ->
val bodyBytes = body.readFully()
if (String(bodyBytes) == "1")
call.respond(HttpStatusCode.NotAcceptable, "You wish")
ByteReadChannel(bodyBytes)
}
}
private object ReceiveBytes : Hook<suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel> {
override fun install(
pipeline: ApplicationCallPipeline,
handler: suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel
) {
pipeline.receivePipeline.intercept(ApplicationReceivePipeline.Before) { body ->
if (body !is ByteReadChannel) return@intercept
val newBody = handler(call, body)
proceedWith(newBody)
}
}
}
private suspend fun ByteReadChannel.readFully(): ByteArray {
var array = ByteArray(0)
while (!this.isClosedForRead) {
val packet = this.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
while (!packet.isEmpty) {
array += packet.readBytes()
}
}
return array
}
this.call.receiveText()
触发ApplicationReceivePipeline
的执行,在ReceiveBytes
钩子中被截获。若要避免执行路由的处理程序块,需要将receiveText()
调用放在其开头,以便在执行其任何代码之前取消处理程序的协程执行。您可以将处理程序的coroutineContext
保存到调用的Attributes
,以取消钩子处理程序中处理程序的协程。
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.locations.*
import io.ktor.server.locations.post
import io.ktor.server.netty.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.util.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlin.text.String
import io.ktor.server.plugins.callid.*
import io.ktor.server.plugins.callloging.*
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlin.coroutines.CoroutineContext
@OptIn(KtorExperimentalLocationsAPI::class)
object Paths {
@Location("/123")
class Root
}
private val contextKey = AttributeKey<CoroutineContext>("context")
@OptIn(KtorExperimentalLocationsAPI::class)
fun main() {
embeddedServer(Netty, port = 8082) {
install(Locations)
install(CallLogging)
install(TestPlugin)
routing {
trace {
application.log.trace(it.buildText())
}
post<Paths.Root> {
call.attributes.put(contextKey, coroutineContext) // Save current coroutine context
val text = this.call.receiveText()
// this async logic is being started even though the call is already answered
// it is much more complicated in my original code, and it's being killed with `java.util.concurrent.CancellationException: Parent job is Cancelling`
async {
println("start delay")
delay(10000)
println("end delay")
}
call.respondText(text)
}
}
}.start(wait = true)
}
@KtorDsl
class TestPluginConfig
val TestPlugin: RouteScopedPlugin<TestPluginConfig> = createRouteScopedPlugin(
"TestPlugin",
::TestPluginConfig
) {
on(ReceiveBytes) { call, body ->
val bodyBytes = body.readFully()
if (String(bodyBytes) == "1") {
call.attributes[contextKey].cancel() // Cancel a route's handler children jobs
call.respond(HttpStatusCode.NotAcceptable, "You wish")
}
ByteReadChannel(bodyBytes)
}
}
private object ReceiveBytes : Hook<suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel> {
override fun install(
pipeline: ApplicationCallPipeline,
handler: suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel
) {
pipeline.receivePipeline.intercept(ApplicationReceivePipeline.Before) { body ->
if (body !is ByteReadChannel) return@intercept
val newBody = handler(call, body)
proceedWith(newBody)
}
}
}
private suspend fun ByteReadChannel.readFully(): ByteArray {
var array = ByteArray(0)
while (!this.isClosedForRead) {
val packet = this.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
while (!packet.isEmpty) {
array += packet.readBytes()
}
}
return array
}
多亏了@AlekseiTirman,我最终得到了一个截然不同的解决方案。
要点:
- 自定义开机自检功能
val callIsProcessedAttributeKey: AttributeKey<Boolean> = AttributeKey("CallIsProcessedAttrKey")
@KtorExperimentalLocationsAPI
inline fun <reified T : Any, reified Q : Serializable> Route.typedPost(
noinline handler: suspend PipelineContext<Unit, ApplicationCall>.(Q) -> Unit
): Route =
post<T> {
// will start ApplicationReceivePipeline
val obj = call.receive<Q>()
val callShouldBeHandled = !Try { call.attributes[callIsProcessedAttributeKey] }
.getOrElse(false)
// decides if we want to proceed with the request or it is already answered with the plugin
if (callShouldBeHandled)
handler(obj)
}
- 在插件中回答之前,我将这个调用属性设置为 true
call.attributes.put(callIsProcessedAttributeKey, true)
call.respond(HttpStatusCode.NotAcceptable, "You wish")