我有一个可观察的事件对象序列和一些处理特定类型事件的观察者。我需要完成以下场景:
- 某些事件类型需要由第一个符合条件的观察者处理(例如observable. subscribeexclusive (x=>{})),并且对其他观察者来说变成"不可观察的"。
- 如果没有订阅,设置一些默认处理程序(例如observable.SubscribeIfNoSubscriptions(x=>{})),这样就不会丢失项(例如,这个处理程序可以将项保存到数据库中以便稍后处理)。
是否有办法用Rx处理这些情况?
"排他性"更简单—您只需让其他人订阅排他性观察者的过滤输出。
"默认"更难——RX编程是函数式编程,订阅者彼此不知道,而根据定义,拥有"默认"订阅者意味着在观察者之间共享一些状态。共享状态的一种方法是使用来自TPL DataFlow的ConcurrentBag或BufferBlock创建一个生产者/消费者队列。另一种方法是使用如下所示的类将"已处理"状态附加到事件本身:
public class Handled<T>
{
public bool IsHandled { get; set; }
public T Data { get; set; }
}
在任何情况下,在使用"default"处理程序之前,您都必须给观察者一些时间来做出反应。下面的代码演示了"Exclusive"one_answers"Default"概念:
var source = new[] {0, 1, 2, 3, 4}.ToObservable();
var afterExclusive = source
.Where(x =>
{
if (x == 0)
{
Console.WriteLine("exclusive");
return false;
}
return true;
})
.Select(x => new Handled<int> {Data = x})
.Publish(); // publish is a must otherwise
afterExclusive // we'll get non shared objects
.Do(x => { x.IsHandled = true; })
.Subscribe();
afterExclusive
.Delay(TimeSpan.FromSeconds(1))
.Where(x => !x.IsHandled)
.Subscribe(x => Console.WriteLine("missed by all {0}", x));
afterExclusive.Connect();
我不确定我非常了解您的场景,但是您对此有何感想:
IObservable<Event> streamOfEvents.SelectMany(x => {
if (matchesExclusiveItem1(x)) {
x += exclusiveItem1Handler;
return Observable.Empty<Event>();
}
// Default case
return Observable.Return(x);
}).Subscribe(x => {
// Default case
x += defaultHandler;
});
我使用"事件对象",因为这是你指定的,但它可能会更好地使用IObservable<IObservable<T>>
-这个选择器有副作用(连接事件),这是不太好。