首先是问题,下面是上下文。如何基于使用异步 gRPC python 服务器从客户端取消 RPC来执行一些服务器端操作(例如,清理)?
在我的微服务中,我有一个asyncio
gRPC 服务器,其主要 RPC 是双向流。
在客户端(也使用 asyncio),当我取消某些内容时,它会引发一个asyncio.CancelledError
,该被grpc
核心捕获而不是重新引发:
https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi#L679
except asyncio.CancelledError:
_LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method()))
因此,我不能依赖于在自己的代码中捕获asyncio.CancelledError
,因为它是事先捕获的,而不是重新提出的。
共享上下文应该包含有关 RPC 是否已在客户端取消的信息,方法是从 RPC 调用调用.cancel()
,并能够通过调用.cancelled()
来查看它是否已取消:
https://grpc.github.io/grpc/python/grpc_asyncio.html#shared-context
抽象取消()
Cancels the RPC. Idempotent and has no effect if the RPC has already terminated. Returns A bool indicates if the cancellation is performed or not. Return type bool
摘要已取消()
Return True if the RPC is cancelled. The RPC is cancelled when the cancellation was requested with cancel(). Returns A bool indicates whether the RPC is cancelled or not. Return type bool
但是,此共享上下文不会附加到 gRPC 生成的代码提供给服务器端 RPC 的context
变量。(我不能跑context.cancelled()
或context.add_done_callback
;他们不在场)
所以,再次,问题:我如何根据从客户端取消具有异步 gRPC python 服务器的 RPC来执行一些服务器端操作(例如,清理)?
感谢您的帖子。我们已意识到此问题,并且添加对这两种方法的支持已在我们的路线图中。
对于短期解决方案,您可以使用 try-catch 和修饰器。客户端取消被视为方法处理程序中的asyncio.CancelledError
。下面是一个修改后的helloworld示例:
服务器代码:
class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(
self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext) -> helloworld_pb2.HelloReply:
try:
await asyncio.sleep(4)
except asyncio.CancelledError:
print('RPC cancelled')
raise
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
客户端代码:
async def run() -> None:
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
call = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
await asyncio.sleep(2)
call.cancel()
print(await call.code())