我有一个发布通用消息的服务,我为这些消息创建了一个可观察的对象。这些消息可以包含任何内容,不同的协议可以在上面分层。
我希望添加第二个可观察层,以从这些消息中解释特定的协议。例如,消息的类型可以是"更新"、"错误"或"完成"。我希望重新发布"更新"消息,在"错误"上抛出错误,并在"完成"上完成序列。
我怎样才能做到这一点?
我认为我不能用SelectMany
来做这件事;虽然选择器可以在前两种情况下返回Observable.Return()
或Observable.Throw()
,但我无法从选择器中完成(调用observer.OnCompleted()
并取消订阅底层可观察对象)。
在我看来,我必须使用Observable.Create()
并订阅订阅方法中的底层可观察对象。我已经这样做了,但实现对我来说很奇怪,因为它没有使用Rx中更常见的函数组合风格。这是正确的方法吗?
public IObservable<Message> InterpretProtocol(IObservable<message> stream)
{
return Observable.Create<Message>(observer =>
{
return stream.Subscribe(message =>
{
switch (ProtocolMessageTypeOf(message))
{
case ProtocolMessageType.Error:
observer.OnError(new InvalidOperationException(message));
break;
case ProtocolMessageType.Complete:
observer.OnCompleted();
break;
default:
observer.OnNext(message);
break;
}
});
});
}
您可以尝试以下操作:
public IObservable<Message> InterpretProtocol(IObservable<message> stream) {
return stream.
TakeWhile(msg => ProtocolMessageTypeOf(message) != ProtocolMessageType.Complete).
Select(msg => {
if(ProtocolMessageTypeOf(message) == ProtocolMessageType.Error)
throw new InvalidOperationException(message);
else
return msg;
});
}