我有一个应用程序通过串行端口与设备通信。每个发送的命令都由包含状态/答案的数据事件应答。基本上有更改设备的命令和仅返回状态的命令。每次应答最后一个命令时(因此在接收数据时),应用应发送下一个命令或默认查询状态。我正在尝试使用 rxjs 对此进行建模。我的想法是,有一个命令可观察和一个从数据事件派生的数据可观察。这两者应该以这样的方式组合,即生成的可观察量仅在有数据时发出值,并将其与命令或默认命令(请求状态)组合在一起,如果没有命令从命令流中下来。
data: ---------d---d-----d---------d------d-------
command: --c1---c2----------------------c3-----------
______________________________________________________
combined ---------c1--c2----dc--------dc-----c3
dc 是默认命令。此外,不应丢失任何命令。
目前,我有一个匿名主题的实现,我自己实现了可观察和观察者。从数组中的命令流收集命令,订阅数据事件,使用 onNext 手动发布数据,并从数组或默认命令发送下一个命令。这有效,但我觉得这可以用 rxjs 更优雅地表达。
一种方法是使用单独的default_command流,每 100 毫秒重复一次默认命令。它与命令流合并,然后与数据流一起压缩。这里的问题是合并的命令流,因为它堆积了默认命令,但默认命令应该只在没有其他命令的情况下应用。
我唯一能想到的就是:
- 订阅命令流并将结果排队(在数组中)
- 将映射操作应用于将从队列中提取的数据流(如果队列为空,则使用 default)。
我们可以将其包装成一个通用的可观察运算符。 我不擅长名字,所以我把它称为zipWithDefault
:
Rx.Observable.prototype.zipWithDefault = function(bs, defaultB, selector) {
var source = this;
return Rx.Observable.create(function(observer) {
var sourceSubscription = new Rx.SingleAssignmentDisposable(),
bSubscription = new Rx.SingleAssignmentDisposable(),
subscriptions = new Rx.CompositeDisposable(sourceSubscription, bSubscription),
bQueue = [],
mappedSource = source.map(function(value) {
return selector(value, bQueue.length ? bQueue.shift() : defaultB);
});
bSubscription.setDisposable(bs.subscribe(
function(b) {
bQueue.push(b);
},
observer.onError.bind(observer)));
sourceSubscription.setDisposable(mappedSource.subscribe(observer));
return subscriptions;
});
};
并像这样使用它:
combined = dataStream
.zipWithDefault(commandStream, defaultCommand, function (data, command) {
return command;
});
我认为sample
运算符将是你最好的选择。 不幸的是,它没有内置的默认值,因此您必须从现有运算符中滚动自己的默认值:
Rx.Observable.prototype.sampleWithDefault = function(sampler, defaultValue){
var source = this;
return new Rx.AnonymousObservable(function (observer) {
var atEnd, value, hasValue;
function sampleSubscribe() {
observer.onNext(hasValue ? value : defaultValue);
hasValue = false;
}
function sampleComplete() {
atEnd && observer.onCompleted();
}
return new Rx.CompositeDisposable(
source.subscribe(function (newValue) {
hasValue = true;
value = newValue;
}, observer.onError.bind(observer), function () {
atEnd = true;
}),
sampler.subscribe(sampleSubscribe, observer.onError.bind(observer), sampleComplete)
);
}, source);
}
您可以使用 controlled
运算符实现排队行为。 因此,您的最终数据链如下所示:
var commands = getCommandSource().controlled();
var pipeline = commands
.sampleWithDefault(data, defaultCommand)
.tap(function() { commands.request(1); });
下面是一个完整的示例:
Rx.Observable.prototype.sampleWithDefault = function(sampler, defaultValue) {
var source = this;
return new Rx.AnonymousObservable(function(observer) {
var atEnd, value, hasValue;
function sampleSubscribe() {
observer.onNext(hasValue ? value : defaultValue);
hasValue = false;
}
function sampleComplete() {
atEnd && observer.onCompleted();
}
return new Rx.CompositeDisposable(
source.subscribe(function(newValue) {
hasValue = true;
value = newValue;
}, observer.onError.bind(observer), function() {
atEnd = true;
}),
sampler.subscribe(sampleSubscribe, observer.onError.bind(observer), sampleComplete)
);
}, source);
}
var scheduler = new Rx.TestScheduler();
var onNext = Rx.ReactiveTest.onNext;
var onCompleted = Rx.ReactiveTest.onCompleted;
var data = scheduler.createHotObservable(onNext(210, 18),
onNext(220, 17),
onNext(230, 16),
onNext(250, 15),
onCompleted(1000));
var commands = scheduler.createHotObservable(onNext(205, 'a'),
onNext(210, 'b'),
onNext(240, 'c'),
onNext(400, 'd'),
onCompleted(800))
.controlled(true, scheduler);
var pipeline = commands
.sampleWithDefault(data, 'default')
.tap(function() {
commands.request(1);
});
var output = document.getElementById("output");
pipeline.subscribe(function(x) {
var li = document.createElement("li");
var text = document.createTextNode(x);
li.appendChild(text);
output.appendChild(li);
});
commands.request(1);
scheduler.start();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.testing.js"></script>
<div>
<ul id="output" />
</div>
这可以通过使用扫描功能来解决。在累积值中,存储尚未收到任何数据响应的命令。
var result = Rx.Observable
.merge(data, command)
.scan(function (acc, x) {
if (x === 'd') {
acc.result = acc.commands.length > 0 ? acc.commands.shift() : 'dc';
} else {
acc.result = '';
acc.commands.push(x);
}
return acc;
}, {result: '', commands: []})
.map(function (x) {
return x.result;
})
.filter(function (x) {
return x !== '';
});
请在此处找到完整的更多详细信息:http://jsbin.com/tubade/edit?html,js,console