这都是伪代码…
好的,这是我的场景,我有一个传入的数据流被解析成数据包。
有一个IObservable<Packets> Packets
每个数据包有一个数据包ID,即1,2,3,4
我想创建只接收特定ID的可观察对象。
所以我做了:
Packets.Where(p=>p.Id == 1)
…给我一个IObservable<Packets>
,它只给我Id 1的数据包。
我可能有几个:
Packets.Where(p=>p.Id == 2)
Packets.Where(p=>p.Id == 3)
Packets.Where(p=>p.Id == 4)
Packets.Where(p=>p.Id == 5)
这基本上是有效的,但是我想选择的Id越多,需要的处理就越多,即p=>p.Id
将为每个Id运行,即使在找到目标Observable之后。
如何使路由更有效,类似于:
字典听众;
listeners.GetValue (packet.Id) .OnDataReceived(包)
这样,当一个id被我的一个IObservables拾取时,其他的都看不到它?
更新根据Lee Campbell的groupby建议添加了一个扩展:
public static class IObservableExtensions
{
class RouteTable<TKey, TSource>
{
public static readonly ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>> s_routes = new ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>>();
}
public static IObservable<TSource> Route<TKey, TSource>(this IObservable<TSource> source, Func<TSource, TKey> selector, TKey id)
{
var grouped = RouteTable<TKey, TSource>.s_routes.GetValue(source, s => s.GroupBy(p => selector(p)).Replay().RefCount());
return grouped.Where(e => e.Key.Equals(id)).SelectMany(e => e);
}
}
可以这样使用:
Subject<Packet> packetSubject = new Subject<Packet>();
var packets = packetSubject.AsObservable();
packets.Route((p) => p.Id, 5).Subscribe((p) =>
{
Console.WriteLine("5");
});
packets.Route((p) => p.Id, 4).Subscribe((p) =>
{
Console.WriteLine("4");
});
packets.Route((p) => p.Id, 3).Subscribe((p) =>
{
Console.WriteLine("3");
});
packetSubject.OnNext(new Packet() { Id = 1 });
packetSubject.OnNext(new Packet() { Id = 2 });
packetSubject.OnNext(new Packet() { Id = 3 });
packetSubject.OnNext(new Packet() { Id = 4 });
packetSubject.OnNext(new Packet() { Id = 5 });
packetSubject.OnNext(new Packet() { Id = 4 });
packetSubject.OnNext(new Packet() { Id = 3 });
输出是:3、4、5、4、3
当它看到一个新的数据包Id时,它只检查每个组的Id。
这是我很久以前写的一个操作符,但我认为它可以满足您的要求。我仍然认为一个简单的.Where
可能更好——即使有多个订阅者。
然而,我想为可观察对象设置一个.ToLookup
,使其操作符与可枚举对象的操作符相同。
它不是内存高效的,但是它实现了IDisposable
,所以它可以在之后被清理。它也不是线程安全的,所以可能需要一点加固。
在这里:
public static class ObservableEx
{
public static IObservableLookup<K, V> ToLookup<T, K, V>(this IObservable<T> source, Func<T, K> keySelector, Func<T, V> valueSelector, IScheduler scheduler)
{
return new ObservableLookup<T, K, V>(source, keySelector, valueSelector, scheduler);
}
internal class ObservableLookup<T, K, V> : IDisposable, IObservableLookup<K, V>
{
private IDisposable _subscription = null;
private readonly Dictionary<K, ReplaySubject<V>> _lookups = new Dictionary<K, ReplaySubject<V>>();
internal ObservableLookup(IObservable<T> source, Func<T, K> keySelector, Func<T, V> valueSelector, IScheduler scheduler)
{
_subscription = source.ObserveOn(scheduler).Subscribe(
t => this.GetReplaySubject(keySelector(t)).OnNext(valueSelector(t)),
ex => _lookups.Values.ForEach(rs => rs.OnError(ex)),
() => _lookups.Values.ForEach(rs => rs.OnCompleted()));
}
public void Dispose()
{
if (_subscription != null)
{
_subscription.Dispose();
_subscription = null;
_lookups.Values.ForEach(rs => rs.Dispose());
_lookups.Clear();
}
}
private ReplaySubject<V> GetReplaySubject(K key)
{
if (!_lookups.ContainsKey(key))
{
_lookups.Add(key, new ReplaySubject<V>());
}
return _lookups[key];
}
public IObservable<V> this[K key]
{
get
{
if (_subscription == null) throw new ObjectDisposedException("ObservableLookup");
return this.GetReplaySubject(key).AsObservable();
}
}
}
}
public interface IObservableLookup<K, V> : IDisposable
{
IObservable<V> this[K key] { get; }
}
你可以这样使用:
IObservable<Packets> Packets = ...
IObservableLookup<int, Packets> lookup = Packets.ToLookup(p => p.Id, p => p, Scheduler.Default);
lookup[1].Subscribe(p => { });
lookup[2].Subscribe(p => { });
// etc
这样做的好处是,你可以在之前通过键订阅值,该键的值已经由源可观察对象生成。
不要忘记调用lookup.Dispose()
来清理资源。
我建议查看GroupBy
,然后检查是否有性能回报。我想有,但它重要吗?
Packets.GroupBy(p=>p.Id)
关于如何使用GroupBy
作为路由器类型的示例代码
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
ReactiveTest.OnNext(100, 1),
ReactiveTest.OnNext(200, 2),
ReactiveTest.OnNext(300, 3),
ReactiveTest.OnNext(400, 4),
ReactiveTest.OnNext(500, 5),
ReactiveTest.OnNext(600, 6),
ReactiveTest.OnNext(700, 7),
ReactiveTest.OnNext(800, 8),
ReactiveTest.OnNext(900, 9),
ReactiveTest.OnNext(1000, 10),
ReactiveTest.OnNext(1100, 11)
);
var router = source.GroupBy(i=>i%4)
.Publish()
.RefCount();
var zerosObserver = scheduler.CreateObserver<int>();
router.Where(grp=>grp.Key == 0)
.Take(1)
.SelectMany(grp=>grp)
.Subscribe(zerosObserver);
var onesObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 1)
.Take(1)
.SelectMany(grp => grp)
.Subscribe(onesObserver);
var twosObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 2)
.Take(1)
.SelectMany(grp => grp)
.Subscribe(twosObserver);
var threesObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 3)
.Take(1)
.SelectMany(grp => grp)
.Subscribe(threesObserver);
scheduler.Start();
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(400, 4), ReactiveTest.OnNext(800, 8)}, zerosObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(100, 1), ReactiveTest.OnNext(500, 5), ReactiveTest.OnNext(900, 9)}, onesObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(200, 2), ReactiveTest.OnNext(600, 6), ReactiveTest.OnNext(1000, 10) }, twosObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(300, 3), ReactiveTest.OnNext(700, 7), ReactiveTest.OnNext(1100, 11)}, threesObserver.Messages);
您可以使用GroupBy
来分割数据。我建议你先设置好所有的订阅,然后激活你的源。这样做会导致一个巨大的嵌套GroupBy
查询,但也可以多cast您的组并单独订阅它们。我在下面写了一个小的帮助工具来完成这个任务。
因为您仍然可能希望在源被激活后添加新的路由(通过Connect
完成),我们使用Replay
来重播组。Replay
也是一个多强制转换操作符,所以我们不需要Publish
来进行多强制转换。
public sealed class RouteData<TKey, TSource>
{
private IConnectableObservable<IGroupedObservable<TKey, TSource>> myRoutes;
public RouteData(IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
this.myRoutes = source.GroupBy(keySelector).Replay();
}
public IDisposable Connect()
{
return this.myRoutes.Connect();
}
public IObservable<TSource> Get(TKey id)
{
return myRoutes.FirstAsync(e => e.Key.Equals(id)).Merge();
}
}
public static class myExtension
{
public static RouteData<TKey, TSource> RouteData<TKey, TSource>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
return new RouteData<TKey, TSource>(source, keySelector);
}
}
使用例子:
public class myPackage
{
public int Id;
public myPackage(int id)
{
this.Id = id;
}
}
class program
{
static void Main()
{
var source = new[] { 0, 1, 2, 3, 4, 5, 4, 3 }.ToObservable().Select(i => new myPackage(i));
var routes = source.RouteData(e => e.Id);
var subscription = new CompositeDisposable(
routes.Get(5).Subscribe(Console.WriteLine),
routes.Get(4).Subscribe(Console.WriteLine),
routes.Get(3).Subscribe(Console.WriteLine),
routes.Connect());
Console.ReadLine();
}
}
您可能需要考虑编写一个自定义IObserver来执行您的命令。我在下面包含了一个例子。
void Main()
{
var source = Observable.Range(1, 10);
var switcher = new Switch<int, int>(i => i % 3);
switcher[0] = Observer.Create<int>(val => Console.WriteLine($"{val} Divisible by three"));
source.Subscribe(switcher);
}
class Switch<TKey,TValue> : IObserver<TValue>
{
private readonly IDictionary<TKey, IObserver<TValue>> cases;
private readonly Func<TValue,TKey> idExtractor;
public IObserver<TValue> this[TKey decision]
{
get
{
return cases[decision];
}
set
{
cases[decision] = value;
}
}
public Switch(Func<TValue,TKey> idExtractor)
{
this.cases = new Dictionary<TKey, IObserver<TValue>>();
this.idExtractor = idExtractor;
}
public void OnNext(TValue next)
{
IObserver<TValue> nextCase;
if (cases.TryGetValue(idExtractor(next), out nextCase))
{
nextCase.OnNext(next);
}
}
public void OnError(Exception e)
{
foreach (var successor in cases.Values)
{
successor.OnError(e);
}
}
public void OnCompleted()
{
foreach (var successor in cases.Values)
{
successor.OnCompleted();
}
}
}
您显然需要实现idExtractor来从数据包中提取id。