如果从单独的线程调用,则 ResponseStream 和 RequestStream 永远不会继续



聊天示例(.Net Framework 4.x(包括交错读取和写入同一AsyncDuplexStreamingCall的代码。我更改了 .Net Core Greeter 示例以包含流式 RPC:

service GreetingService {
    rpc Greeting(HelloRequest) returns (HelloResponse);
    rpc StreamGreeting(stream HelloRequest) returns (stream HelloResponse);
}

然后我基本上复制了服务器的 echo 实现:

public override async Task StreamGreeting(/* ... */)
{
    while (await requestStream.MoveNext(CancellationToken.None))
    {
        var request = requestStream.Current;
        Console.WriteLine($"Stream Message: {request.Name}");
        await responseStream.WriteAsync(new HelloResponse { Greeting = "Stream Hello " + request.Name });
    }
    Console.WriteLine("Stream completed.");
}

最后,我尝试从单独的线程读取和写入此流:

var callResult = client.StreamGreeting(new CallOptions());
// Or: Task.Run
new Thread(async () =>
{
    await callResult.RequestStream.WriteAsync(request);
    await callResult.RequestStream.WriteAsync(request);
    await callResult.RequestStream.WriteAsync(request);
    await callResult.RequestStream.WriteAsync(request);
    await callResult.RequestStream.CompleteAsync();
}).Start();
new Thread(async () =>
{
    while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
    {
        Console.WriteLine(callResult.ResponseStream.Current.Greeting);
    }
}).Start();

WriteAsync的第一次调用成功(服务器记录它收到消息(,但第二次调用永远不会返回/继续。 ResponseStream.MoveNext永远不会返回/继续。在主任务/线程上运行都不能解决问题。在主任务/线程上运行两者都有效,包括所有形式的交错调用。

我做错了什么,还是这是一个限制?

我可以重现您描述的结果,但只有在发出处理线程后立即包含GreeterClient示例中的行channel.ShutdownAsync().Wait()时。

然后,这会导致InvalidOperationException,因为在流式处理调用正在进行时关闭客户端存根。

若要确保在服务器上处理所有四个调用,您需要等待所有流请求和响应完成,然后再关闭:

  var callResult = client.StreamGreeting(new CallOptions());
  Task.WhenAll(new[]
  {
    Task.Run(async () =>
    {
      await callResult.RequestStream.WriteAsync(request);
      await callResult.RequestStream.WriteAsync(request);
      await callResult.RequestStream.WriteAsync(request);
      await callResult.RequestStream.WriteAsync(request);
      await callResult.RequestStream.CompleteAsync();
    }),
    Task.Run(async () =>
    {
      while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
      {
        Console.WriteLine(callResult.ResponseStream.Current.Greeting);
      }
    })
  }).Wait();   

最新更新