GRPC streaming select (python)



假设我想创建一个类似聊天的应用程序。客户端可以将文本发送到服务器,反之亦然。文本交换的顺序可以是任意的。

服务器

依赖于控制服务器响应流的另一个流。GRPC 流作为 python 生成器公开。服务器现在如何同时等待客户端输入和另一个流的输入?通常人们会使用 select(( 之类的东西,但这里我们有生成器。

我有一些示例代码可以实现所需的行为,但在客户端和服务器端需要一个额外的线程。如何在没有线程的情况下获得相同的结果?

原始:

syntax = 'proto3';
service Scenario {
    rpc Chat(stream DPong) returns (stream DPong) {}
}
message DPong {
    string name = 1;
}

服务器:

import random
import string
import threading
import grpc
import scenario_pb2_grpc
import scenario_pb2
import time
from concurrent import futures
class Scenario(scenario_pb2_grpc.ScenarioServicer):
    def Chat(self, request_iterator, context):
        def stream():
            while 1:
                time.sleep(1)
                yield random.choice(string.ascii_letters)
        output_stream = stream()
        def read_incoming():
            while 1:
                received = next(request_iterator)
                print('received: {}'.format(received))
        thread = threading.Thread(target=read_incoming)
        thread.daemon = True
        thread.start()
        while 1:
            yield scenario_pb2.DPong(name=next(output_stream))

if __name__ == '__main__':
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    scenario_pb2.add_ScenarioServicer_to_server(
        Scenario(), server)
    server.add_insecure_port('[::]:50052')
    server.start()
    print('listening ...')
    while 1:
        time.sleep(1)

客户

import threading
import grpc
import time
import scenario_pb2_grpc, scenario_pb2

def run():
    channel = grpc.insecure_channel('localhost:50052')
    stub = scenario_pb2_grpc.ScenarioStub(channel)
    print('client connected')
    def stream():
        while 1:
            yield scenario_pb2.DPong(name=input('$ '))
    input_stream = stub.Chat(stream())
    def read_incoming():
        while 1:
            print('received: {}'.format(next(input_stream).name))
    thread = threading.Thread(target=read_incoming)
    thread.daemon = True
    thread.start()
    while 1:
        time.sleep(1)
if __name__ == '__main__':
    print('client starting ...')
    run()
目前,

如果不花费您正在花费的线程,就无法做到这一点。我们正在考虑实现增强功能,使实现能够避免使用另一个线程,但最早需要几个月的时间。

最新更新