Python GRPC从客户端取消单流呼叫



使用python grpc,我希望能够从客户端取消一个长期运行的单式通话,当设置threading.Event时。

def application(stub: StreamsStub, event: threading.Event):
    stream = stub.Application(ApplicationStreamRequest())
    try:
        for resp in stream:
            print(resp)
    except grpc.RpcError as e:
        print(e)

目前是我使用channel.close()方法取消流,但这当然会关闭所有连接,而不仅仅是此流。

有人可以建议我如何使用事件取消流迭代器?谢谢

以下是GRPC Unarystream调用的一些代码。服务器发送无限数量的答复,使客户决定何时停止接收。

而不是使用计数器,您可以让线程关闭并进行一些工作,并在调用CANCAL()之前检查了一个事件,而不是检查计数器。

注意:使用Python 2.7

protofile:

syntax = "proto3";
package my_package;
service HeartBeat {
    rpc Beats(Counter) returns (stream Counter) {}
}
message Counter {
    int32 counter = 1;
}

客户端:

from __future__ import print_function
import grpc
import heartbeat_pb2
import heartbeat_pb2_grpc

def get_beats(stub, channel):
    try:
        result_iterator = stub.Beats(heartbeat_pb2.Counter(counter=i))
        for result in result_iterator:
            print("Count: {}".format(result.counter))
            if result.counter >= 3: # We only wants 3 'beats'
                result_iterator.cancel()
    except grpc.RpcError as rpc_error:
        if rpc_error.code() == grpc.StatusCode.CANCELLED:
            pass # Otherwise, a traceback is printed

def run():
    with grpc.insecure_channel('localhost:9999') as channel:
        stub = heartbeat_pb2_grpc.HeartBeatStub(channel)
        get_beats(stub, channel)

if __name__ == '__main__':
    run()

服务器:

from concurrent import futures
import grpc
from proto_generated import heartbeat_pb2
from proto_generated import heartbeat_pb2_grpc
import time

class HeartBeatServicer(heartbeat_pb2_grpc.HeartBeatServicer):
    pass

    def Beats(self, request, context):
        # Not required, only to show sending the server a message
        print("Beats: {}".format(request.counter))
        def response_message():
            i = 0
            while context.is_active():
                print("Sending {}".format(i))
                response = heartbeat_pb2.Counter(counter=i)
                i += 1
                time.sleep(1)  # Simulate doing work
                yield response
        return response_message()

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    heartbeat_pb2_grpc.add_HeartBeatServicer_to_server(
        HeartBeatServicer(), server)
    server.add_insecure_port('[::]:9999')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

RPC调用返回的_Rendezvous对象实现grpc.RpcErrorgrpc.Futuregrpc.Call,因此取消流与调用stream.cancel(来自grpc.Future接口)

一样简单

相关内容

  • 没有找到相关文章

最新更新