我需要创建一个消息序列。
- 我应该能够按顺序"推送"消息
- 我应该能够"查询"所有消息
- 我应该能够在一条新消息中通知所有听众
现在我完成了:
var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')
var messagesSubject = new Rx.Subject()
var messagesPool = messagesSubject.map(function() { return [el]}).scan([], _.union)
Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 8;}
)
.do(function(x) {
messagesSubject.subscribe(function(msg) {
console.log('subscriber ' + x.value + ' do something with ' + msg.text)
})
}).subscribe()
Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 2;}
)
.map(function() {
return { text: math.pickRandom(['one', 'two', 'three'])}
}).subscribe(messagesSubject)
我如何用以前的所有消息(messagesPool
)通知每个新订户?
附带问题:这是主题的有效用例吗?还是我应该选择另一种类型的科目?
听起来你在寻找ReplaySubject
而不是Subject
。
[ReplaySubject是一个]Subject,它缓冲它观察到的所有项目,并将它们重播给订阅的任何Observer。
正如其他人所指出的,ReplaySubject
可以在这里成为您的朋友。
这可能意味着您可以删除邮件池功能。
如果你只是编写查询,你也可以完全摆脱这个主题:
var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')
var messages = Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 2;}
)
.map(function() {
return { text: math.pickRandom(['one', 'two', 'three'])}
})
.replay();
//Randomly add subscribers (but this would only be dummy code, not suitable for prod)
var randomSubsriberAdder = Rx.Observable
.interval(500 /* ms */)
.timeInterval()
.filter(
function() { return math.randomInt(10) > 8;}
)
.subscribe(function(x) {
messages.subscribe(function(msg) {
console.log('subscriber ' + x.value + ' do something with ' + msg.text);
var connection = messages.Connect();
//messages will now be collecting all values.
// Late subscribers will get all previous values.
// As new values are published, existing subscribers will get the new value.
您最好使用硬编码的数据集和Rx测试工具/lib。通过这种方式,您可以控制正在测试的边缘情况(早期订户、后期订户、断开连接的订户、流上的静音等)
一个不使用主题的代码示例,使用Replay语义和临时订阅者进行单元测试。在具有node-unit
的节点上运行
(windows cmds)
npm install rx
npm install node-unit
.node_modules.binnodeunit.cmd tests
以及CCD_ 6目录中的代码。
var Rx = require('rx')
var onNext = Rx.ReactiveTest.onNext,
onError = Rx.ReactiveTest.onError,
onCompleted = Rx.ReactiveTest.onCompleted,
subscribe = Rx.ReactiveTest.subscribe;
exports.testingReplayWithTransientSubscribers = function(test){
//Declare that we expect to have 3 asserts enforced.
test.expect(3);
//Control time with a test scheduler
var scheduler = new Rx.TestScheduler();
//Create our known message that will be published at known times (all times in milliseconds).
var messages = scheduler.createColdObservable(
onNext(0500, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
onNext(3500, 'four'),
onNext(4000, 'five')
);
//Replay all messages, and connect the reply decorator.
var replay = messages.replay();
var connection = replay.connect();
//Create 3 observers to subscribe/unsubscribe at various times.
var observerA = scheduler.createObserver();
var observerB = scheduler.createObserver();
var observerC = scheduler.createObserver();
//Subscribe immediately
var subA = replay.subscribe(observerA);
//Subscribe late, missing 1 message
var subB = Rx.Disposable.empty;
scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);});
//Subscribe late, and dispose before any live message happen
var subC = Rx.Disposable.empty;
scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);});
scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();});
//Dispose early
scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();});
//Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing)
scheduler.start();
//Assert our assumptions.
test.deepEqual(observerA.messages, [
onNext(0500, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
onNext(3500, 'four'),
onNext(4000, 'five')
],
"ObserverA should receive all values");
test.deepEqual(observerB.messages, [
onNext(0800, 'one'),
onNext(1000, 'two'),
onNext(2000, 'three'),
],
"ObserverB should receive initial value on subscription, and then two live values");
test.deepEqual(observerC.messages, [
onNext(1100, 'one'),
onNext(1100, 'two'),
],
"ObserverC should only receive initial values on subscription");
test.done();
};