假设我想创建一个类似聊天的应用程序。客户端可以将文本发送到服务器,反之亦然。文本交换的顺序可以是任意的。
服务器依赖于控制服务器响应流的另一个流。GRPC 流作为 python 生成器公开。服务器现在如何同时等待客户端输入和另一个流的输入?通常人们会使用 select(( 之类的东西,但这里我们有生成器。
我有一些示例代码可以实现所需的行为,但在客户端和服务器端需要一个额外的线程。如何在没有线程的情况下获得相同的结果?
原始:
syntax = 'proto3';
service Scenario {
rpc Chat(stream DPong) returns (stream DPong) {}
}
message DPong {
string name = 1;
}
服务器:
import random
import string
import threading
import grpc
import scenario_pb2_grpc
import scenario_pb2
import time
from concurrent import futures
class Scenario(scenario_pb2_grpc.ScenarioServicer):
def Chat(self, request_iterator, context):
def stream():
while 1:
time.sleep(1)
yield random.choice(string.ascii_letters)
output_stream = stream()
def read_incoming():
while 1:
received = next(request_iterator)
print('received: {}'.format(received))
thread = threading.Thread(target=read_incoming)
thread.daemon = True
thread.start()
while 1:
yield scenario_pb2.DPong(name=next(output_stream))
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
scenario_pb2.add_ScenarioServicer_to_server(
Scenario(), server)
server.add_insecure_port('[::]:50052')
server.start()
print('listening ...')
while 1:
time.sleep(1)
客户
import threading
import grpc
import time
import scenario_pb2_grpc, scenario_pb2
def run():
channel = grpc.insecure_channel('localhost:50052')
stub = scenario_pb2_grpc.ScenarioStub(channel)
print('client connected')
def stream():
while 1:
yield scenario_pb2.DPong(name=input('$ '))
input_stream = stub.Chat(stream())
def read_incoming():
while 1:
print('received: {}'.format(next(input_stream).name))
thread = threading.Thread(target=read_incoming)
thread.daemon = True
thread.start()
while 1:
time.sleep(1)
if __name__ == '__main__':
print('client starting ...')
run()
目前,
如果不花费您正在花费的线程,就无法做到这一点。我们正在考虑实现增强功能,使实现能够避免使用另一个线程,但最早需要几个月的时间。