将IStreamResult转换为RxJs Observable



问题:
安装了@microsoft/signalr(用于ASP.NET Core的SignalR的Javascript/Typescript客户端(后,我注意到HubConnection定义了一个名为stream的方法,该方法用于调用服务器上的流式集线器方法(使用指定的名称和参数(并返回IStreamResult,该方法描述为:

是一个类似于Observable的API,但我们不希望用户将其混淆,因此我们对其进行重命名。有人可以如果他们愿意的话,可以很容易地将其调整到Rx接口中;接口";并免费获得扩展方法。这些方法实际上必须添加到对象中(JS中没有扩展方法!(。我们不想在核心库中依赖RxJS,所以我们复制了所需的最低逻辑,然后用户可以轻松地将其适应适当的RxJS可观察性,如果他们愿意的话。


我曾尝试使用from运算符将其转换为RxJs Observable,但结果发现Subscription类型(ISubscription(定义了dispose方法而不是unsubscribe
,如以下错误中所解释的:

类型"IStreamResult"不可分配给类型"Subscribeble"。"subscribe(…("返回的类型在这些类型之间不兼容。类型"ISubscription"中缺少属性"unsubscribe",但类型"Unsubscribed"中需要该属性


问题:
如何正确地将IStreamResult转换为Rxjs Observable


IStreamResult(可观测(:

export interface IStreamResult<T> {
subscribe(subscriber: IStreamSubscriber<T>): ISubscription<T>;
}

IStreamSubscriber(观察者(:

export interface IStreamSubscriber<T> {
/** A boolean that will be set by the {@link @microsoft/signalr.IStreamResult} when the stream is closed. */
closed?: boolean;
/** Called by the framework when a new item is available. */
next(value: T): void;
/** Called by the framework when an error has occurred.
*
* After this method is called, no additional methods on the {@link @microsoft/signalr.IStreamSubscriber} will be called.
*/
error(err: any): void;
/** Called by the framework when the end of the stream is reached.
*
* After this method is called, no additional methods on the {@link @microsoft/signalr.IStreamSubscriber} will be called.
*/
complete(): void;
}

I订阅(订阅(:

export interface ISubscription<T> {
/** Disconnects the {@link @microsoft/signalr.IStreamSubscriber} associated with this subscription from the stream. */
dispose(): void;
}

我不知道,以前从未使用过IStreamResult,但从这里开始?

最简单的方法似乎是订阅IStreamResult,然后将每个回调函数绑定到Observable上的Observer。这应该通过可观察的包装器以最少的工作量传递值/事件。

这个假定的IStreamResult在引擎盖下运行。例如,promise是热切的,而不是懒惰的,所以from(promise: Promise)在您订阅之前启动promise,并立即(或在它准备好的时候(发布结果。

如果你已经习惯了懒惰的执行可观察性,那么这可能会非常令人惊讶。我经常使用defer(() => promise:Promise),它只在订阅时启动promise。

如果IStreamResult有这样的非典型行为,可能需要一些挖掘才能弄清楚,因为下面的fromIStreamResult没有检查正确的行为。

旁白:不知道该怎么处理closed?: boolean。。。


/**
* Static operator to turn IStreamResult into Observable
**/
function fromIStreamResult<T>(input: IStreamResult<T>): Observable<T> {
return new Observable<T>(observer => {
subscription = input.subscribe({
next: observer.next.bind(observer),
error: observer.error.bind(observer),
complete: observer.complete.bind(observer)
});
return {
unsubscribe: subscription.dispose.bind(subscription)
}
})
}

然后,您可以像使用from(promise: Promise)一样使用fromIStreamResult(iStream: IStreamResult),并开始使用RxJS运算符进行转换。

最新更新