Grpc-使用可观察的rx



我正试图通过GRPC流公开一个可观察对象。我的简化代码如下:

public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.ForEachAsync(async value =>
{
await responseStream.WriteAsync(value);
});
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}

我收到以下错误:

W0123 14:30:59.709715Grpc.Core.Internal.ServerStreamingServerCallHandler2 Exception occured in handler. System.ArgumentException: Der Wert liegt außerhalb des erwarteten Bereichs. bei Grpc.Core.Internal.ServerStreamingServerCallHandler2.d__4.MoveNext()W0123 14:30:59.7732716 Grpc.Core.Server在处理RPC时发生异常。System.InvalidOperationException:Der Vorgang是aufgrund des这是一个很好的例子。beiGrpc.Core.Internal.AsyncCallServer2.SendStatusFromServerAsync(Status status, Metadata trailers, Tuple2(可选写入)beiGrpc.Core.Interal.ServerStreamingServerCallHandler `2.d_4.MoveNext()---Stapelüberwachung vom vorhergehenden Ort,一位名叫Ausnahme ausgelöst wurde的女士System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)beiSystem.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)bei Grpc.Core.Server.d_34.MoveNext()

您建议如何处理此问题?我想我需要在同一个线程中处理ForEachAsync。

使用gRPC流API,一次只允许写入一项。如果在上一个操作完成之前启动另一个WriteAsync()操作,则会出现异常。在从方法处理程序(本例中为Feed方法)返回之前,您还需要完成所有写入操作。一次只允许一次写入的原因是为了确保gRPC的流控制工作正常。

在您的情况下,Rx API似乎无法确保这一点,因此解决这一问题的一种方法是使用中间缓冲区。

这个怎么样?

public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var result = new Result();
try
{
await Observable.Scan(Task.CompletedTask, (preceding,value) =>
preceding.ContinueWith(_ => responseStream.WriteAsync(value))
);
}
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
}

我还没有测试过,但我认为它仍然有效。

有两个问题阻止OP的方法按预期工作,这两个问题都源于responseStream。WriteAsync期望一次只能挂起一个写操作。

  1. 订阅可观察对象时,每个"onNext"都将在引发通知的同一线程上进行处理。这意味着您可以强制执行一次写入一个字的规则。您可以使用ObserveOn方法来控制如何安排每个通知
var eventLoop = new EventLoopScheduler();
var o = myObservable.ObserveOn(eventLoop);
  1. ForEachAsync的工作方式与异步lambdas的工作方式不同。如果您将async/await放在这里,而是像这样调用WriteAsync,那么Write将在预期的上下文中执行
responseStream.WriteAsync<Response>(value).Wait();

下面是一个完整的例子

public override async Task Feed(Request request, IServerStreamWriter<Response> responseStream, ServerCallContext context)
{
var o = getMyObservable(request);
try
{
var eventLoop = new EventLoopScheduler(); //just for example, use whatever scheduler makes sense for you.
await o.ObserveOn(eventLoop).ForEachAsync<Response>(value =>
{
responseStream.WriteAsync(value).Wait();
}, 
context.CancellationToken);
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
Log.Info("Session ended:" + ex);
}
Log.Info("Session ended normally");
}

相关内容

  • 没有找到相关文章

最新更新