我正在尝试为System.Net.WebSocket编写一个扩展方法,该方法将使用反应式扩展(Rx.NET(将其转换为IObserver。您可以看到下面的代码:
public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
// Wrap the web socket in an interface that's a little easier to manage
var webSocketMessageStream = new WebSocketMessageStream(webSocket);
// Create the output stream to the client
return Observer.Create<T>(
onNext: async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
onError: async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
onCompleted: async () => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
);
}
这段代码有效,但我担心在onNext,onError和onComplete的lambdas中使用async/await。我知道这会返回一个异步空 lambda,这是不受欢迎的(有时会导致我已经遇到过的问题(。
我一直在阅读互联网上的 Rx.NET 文档和博客文章,但我似乎找不到在 IObserver 中使用 async/await 方法的正确方法(如果有的话(。有没有正确的方法可以做到这一点?如果没有,那么我该如何解决此问题?
不接受async
方法。所以这里发生的事情是你正在使用async void
的即发即弃机制。问题是 onNext 消息将不再序列化。
不应在 订阅 中调用 async
方法,而应将其包装到管道中以允许 Rx 等待它。
对onError
和onCompleted
使用即发即弃,因为这些保证是Observable
最后调用的东西。请记住,与Observable
关联的资源可以在onError
后清理,并在完成之前onCompleted
返回。
我写了一个小的扩展方法来做这一切:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
{
return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
.Subscribe(
e => { }, // empty
onError,
onCompleted);
}
首先,我们将async onNext
方法转换为Observable
,连接两个异步世界。这意味着将尊重onNext
内部的异步/等待。接下来,我们将方法包装到Defer
中,这样它就不会在创建时立即启动。相反,Concat
只会在前一个完成时才调用下一个延迟可观察的。
附言。我希望 Rx.Net 的下一个版本将在订阅中获得async
支持。