聊天示例(.Net Framework 4.x(包括交错读取和写入同一AsyncDuplexStreamingCall
的代码。我更改了 .Net Core Greeter 示例以包含流式 RPC:
service GreetingService {
rpc Greeting(HelloRequest) returns (HelloResponse);
rpc StreamGreeting(stream HelloRequest) returns (stream HelloResponse);
}
然后我基本上复制了服务器的 echo 实现:
public override async Task StreamGreeting(/* ... */)
{
while (await requestStream.MoveNext(CancellationToken.None))
{
var request = requestStream.Current;
Console.WriteLine($"Stream Message: {request.Name}");
await responseStream.WriteAsync(new HelloResponse { Greeting = "Stream Hello " + request.Name });
}
Console.WriteLine("Stream completed.");
}
最后,我尝试从单独的线程读取和写入此流:
var callResult = client.StreamGreeting(new CallOptions());
// Or: Task.Run
new Thread(async () =>
{
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.CompleteAsync();
}).Start();
new Thread(async () =>
{
while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
{
Console.WriteLine(callResult.ResponseStream.Current.Greeting);
}
}).Start();
对WriteAsync
的第一次调用成功(服务器记录它收到消息(,但第二次调用永远不会返回/继续。 ResponseStream.MoveNext
永远不会返回/继续。在主任务/线程上运行都不能解决问题。在主任务/线程上运行两者都有效,包括所有形式的交错调用。
我做错了什么,还是这是一个限制?
我可以重现您描述的结果,但只有在发出处理线程后立即包含GreeterClient
示例中的行channel.ShutdownAsync().Wait()
时。
然后,这会导致InvalidOperationException
,因为在流式处理调用正在进行时关闭客户端存根。
若要确保在服务器上处理所有四个调用,您需要等待所有流请求和响应完成,然后再关闭:
var callResult = client.StreamGreeting(new CallOptions());
Task.WhenAll(new[]
{
Task.Run(async () =>
{
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.WriteAsync(request);
await callResult.RequestStream.CompleteAsync();
}),
Task.Run(async () =>
{
while (await callResult.ResponseStream.MoveNext(CancellationToken.None))
{
Console.WriteLine(callResult.ResponseStream.Current.Greeting);
}
})
}).Wait();