使用python grpc,我希望能够从客户端取消一个长期运行的单式通话,当设置threading.Event
时。
def application(stub: StreamsStub, event: threading.Event):
stream = stub.Application(ApplicationStreamRequest())
try:
for resp in stream:
print(resp)
except grpc.RpcError as e:
print(e)
目前是我使用channel.close()
方法取消流,但这当然会关闭所有连接,而不仅仅是此流。
有人可以建议我如何使用事件取消流迭代器?谢谢
以下是GRPC Unarystream调用的一些代码。服务器发送无限数量的答复,使客户决定何时停止接收。
而不是使用计数器,您可以让线程关闭并进行一些工作,并在调用CANCAL()之前检查了一个事件,而不是检查计数器。
注意:使用Python 2.7
protofile:
syntax = "proto3";
package my_package;
service HeartBeat {
rpc Beats(Counter) returns (stream Counter) {}
}
message Counter {
int32 counter = 1;
}
客户端:
from __future__ import print_function
import grpc
import heartbeat_pb2
import heartbeat_pb2_grpc
def get_beats(stub, channel):
try:
result_iterator = stub.Beats(heartbeat_pb2.Counter(counter=i))
for result in result_iterator:
print("Count: {}".format(result.counter))
if result.counter >= 3: # We only wants 3 'beats'
result_iterator.cancel()
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.CANCELLED:
pass # Otherwise, a traceback is printed
def run():
with grpc.insecure_channel('localhost:9999') as channel:
stub = heartbeat_pb2_grpc.HeartBeatStub(channel)
get_beats(stub, channel)
if __name__ == '__main__':
run()
服务器:
from concurrent import futures
import grpc
from proto_generated import heartbeat_pb2
from proto_generated import heartbeat_pb2_grpc
import time
class HeartBeatServicer(heartbeat_pb2_grpc.HeartBeatServicer):
pass
def Beats(self, request, context):
# Not required, only to show sending the server a message
print("Beats: {}".format(request.counter))
def response_message():
i = 0
while context.is_active():
print("Sending {}".format(i))
response = heartbeat_pb2.Counter(counter=i)
i += 1
time.sleep(1) # Simulate doing work
yield response
return response_message()
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
heartbeat_pb2_grpc.add_HeartBeatServicer_to_server(
HeartBeatServicer(), server)
server.add_insecure_port('[::]:9999')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
RPC调用返回的_Rendezvous
对象实现grpc.RpcError
,grpc.Future
和grpc.Call
,因此取消流与调用stream.cancel
(来自grpc.Future
接口)