受试者.消息侦听器示例



我需要创建一个消息序列。

  • 我应该能够按顺序"推送"消息
  • 我应该能够"查询"所有消息
  • 我应该能够在一条新消息中通知所有听众

现在我完成了:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')
var messagesSubject = new Rx.Subject()
var messagesPool = messagesSubject.map(function() { return [el]}).scan([], _.union)

Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .do(function(x) {
    messagesSubject.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text)
    })
  }).subscribe()
Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  }).subscribe(messagesSubject)

我如何用以前的所有消息(messagesPool)通知每个新订户?

附带问题:这是主题的有效用例吗?还是我应该选择另一种类型的科目?

听起来你在寻找ReplaySubject而不是Subject

[ReplaySubject是一个]Subject,它缓冲它观察到的所有项目,并将它们重播给订阅的任何Observer。

正如其他人所指出的,ReplaySubject可以在这里成为您的朋友。

这可能意味着您可以删除邮件池功能。

如果你只是编写查询,你也可以完全摆脱这个主题:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')
var messages = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  })
  .replay();
//Randomly add subscribers (but this would only be dummy code, not suitable for prod)
var randomSubsriberAdder = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .subscribe(function(x) {
    messages.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text);
var connection = messages.Connect();
//messages will now be collecting all values.
//  Late subscribers will get all previous values.
//  As new values are published, existing subscribers will get the new value.

您最好使用硬编码的数据集和Rx测试工具/lib。通过这种方式,您可以控制正在测试的边缘情况(早期订户、后期订户、断开连接的订户、流上的静音等)

一个不使用主题的代码示例,使用Replay语义和临时订阅者进行单元测试。在具有node-unit 的节点上运行

(windows cmds)

npm install rx
npm install node-unit
.node_modules.binnodeunit.cmd tests

以及CCD_ 6目录中的代码。

var Rx = require('rx')
var onNext = Rx.ReactiveTest.onNext,
    onError = Rx.ReactiveTest.onError,
    onCompleted = Rx.ReactiveTest.onCompleted,
    subscribe = Rx.ReactiveTest.subscribe;
exports.testingReplayWithTransientSubscribers = function(test){
    //Declare that we expect to have 3 asserts enforced.
    test.expect(3);
    //Control time with a test scheduler
    var scheduler = new Rx.TestScheduler();
    //Create our known message that will be published at known times (all times in milliseconds).
    var messages = scheduler.createColdObservable(
        onNext(0500, 'one'),
        onNext(1000, 'two'),
        onNext(2000, 'three'),
        onNext(3500, 'four'),
        onNext(4000, 'five')
    );
    //Replay all messages, and connect the reply decorator.
    var replay = messages.replay();
    var connection = replay.connect();
    //Create 3 observers to subscribe/unsubscribe at various times.
    var observerA = scheduler.createObserver();
    var observerB = scheduler.createObserver();
    var observerC = scheduler.createObserver();
    //Subscribe immediately
    var subA = replay.subscribe(observerA);
    //Subscribe late, missing 1 message
    var subB = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);});
    //Subscribe late, and dispose before any live message happen
    var subC = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);});
    scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();});
    //Dispose early 
    scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();});

    //Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing)
    scheduler.start();
    //Assert our assumptions.
    test.deepEqual(observerA.messages, [
            onNext(0500, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
            onNext(3500, 'four'),
            onNext(4000, 'five')
        ], 
        "ObserverA should receive all values");
    test.deepEqual(observerB.messages, [
            onNext(0800, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
        ], 
        "ObserverB should receive initial value on subscription, and then two live values");
    test.deepEqual(observerC.messages, [
            onNext(1100, 'one'),
            onNext(1100, 'two'),
        ], 
        "ObserverC should only receive initial values on subscription");
    test.done();
};

相关内容

  • 没有找到相关文章

最新更新