来自EventEmitter的热门和共享Observable



是否有办法从EventEmitter(或Angular 2 alpha 46/RxJS 5 alpha中可用的等效物)获得热可观测值?即如果我们在值被解析后订阅,则它会使用先前解析的值触发。类似于我们总是回报同样的承诺。

理想情况下,只使用Angular 2对象(我在某个地方读到一个轻RxJS稍后会嵌入以删除依赖项),否则导入RxJS就可以了。AsyncSubject似乎符合我的需求,但它在RxJS 5 alpha中不可用。

我尝试了以下操作,但没有成功(从未触发)。你知道如何使用它吗?

let emitter = new EventEmitter<MyObj>();
setTimeout(() => {emitter.next(new MyObj());});
this.observable = emitter;
return this.observable.share();

Full plunker在这里比较冷热

用例:只访问一些异步对象一次(例如,一系列合并/包装在EventEmitter中的HTTP调用),但将已解析的异步对象提供给订阅它的任何服务/组件,即使它们在解析后订阅(接收到HTTP响应)。

EDIT:问题不在于如何合并HTTP响应,而在于如何从EventEmitter或Angular 2 alpha 46/RxJS 5 alpha中获得(热?)可观察结果,该结果允许在检索/解决异步结果后进行订阅(HTTP只是异步起源的一个示例)myEventEmitter.share()不起作用(上面的cf plunker),尽管它可以使用HTTP返回的Observable(来自@Eric Martinez的cf plonker)。从Angular 2 alpha 46开始,.toRx()方法不再存在,EventEmitter本身就是可观察的对象

只要我们总是返回相同的promise对象,这就可以很好地处理promise。由于我们在HTTPAngular 2服务中引入了观察者,我希望避免混合promise和观察者(据说观察者比promise更强大,所以它应该允许使用promise做容易的事情)。

关于share()的规范(我还没有找到Angular 2使用的版本5 alpha的文档)-处理Angular 2 HTTP服务返回的Observable,而不是处理EventEmitter。

EDIT:澄清了为什么不使用HTTP返回的Observable,并补充说不直接使用RxJS会更好

编辑:更改了描述:关注的是多个订阅,而不是合并HTTP结果

谢谢!

您所描述的功能似乎不是冷可观察的,而是Rx.BehaviourSubject。请查看此处,了解有关Rxjs主题的解释:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md.

我引用的是:

BehaviourSubject类似于ReplaySubject,只是它只存储它发布的最后一个值。BehaviourSubject在初始化时也需要一个默认值。当受试者尚未收到其他值时,会将此值发送给观察者。这意味着,除非Subject已经完成,否则所有订阅者在订阅时都会立即收到一个值。

Rx.AsyncSubject在行为上最接近承诺:

AsyncSubject类似于Replay和Behavior主题,但它只存储最后一个值,并且只在序列完成时发布。当源可观察对象是热的并且可能在任何观察者订阅之前完成时,您可以使用AsyncSubject类型。在这种情况下,AsyncSubjects仍然可以提供最后一个值,并将其发布给任何未来的订阅者。

还有两条评论:

  • 在您的plunker中:this._coldObservable = emitter.share();。使用share返回热可观察
  • EventEmitter实际上首先扩展了主题

更新:将EventEmitter包裹在Rx.Observable:周围

function toRx ( eventEmitter ) {
  return Rx.Observable.create(function ( observer ) {
    eventEmitter.subscribe(function listener ( value ) {observer.onNext(value)});
    // Ideally you also manage error and completion, if that makes sense with Angular2
    return function () {
      /* manage end of subscription here */
    };
  };
)
}

一旦你有了Rx.Observable,你就可以应用share()shareReplay(1),任何你想要的。

我敢打赌,Angular团队迟早会提出一个brigding函数,但如果你不想等待,你可以自己做。

ReplaySubject正在做我想要做的事情@robwormald在gitter上提供了一个工作示例,为了更好地演示,我做了一些修改。

公开HTTP响应:

import {Injectable} from 'angular2/angular2';
import {Http} from 'angular2/http';
import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx'
@Injectable()
export class PeopleService {
  constructor(http:Http) {
    this.people = new ReplaySubject(1);
    http.get('api/people.json')
      .map(res => res.json())
      .subscribe(this.people);
  }
}

多次订阅:

// ... annotations
export class App {
  constructor(peopleService:PeopleService) {
    people.subscribe(v => {
      console.log(v)
    });
    //some time later
    setTimeout(() => {
      people.subscribe(v => {
        console.log(v)
      });
      people.subscribe(v => {
        console.log(v)
      });
    },2000)
  }
}

Full plunker

编辑:BehaviorSubject是一种替代方案。在这个用例中,区别在于初始值,例如,如果我们想在使用HTTP响应更新之前显示缓存中的内容。

最新更新