如何将可观察对象的值路由到不同的订阅者



这都是伪代码…

好的,这是我的场景,我有一个传入的数据流被解析成数据包。

有一个IObservable<Packets> Packets

每个数据包有一个数据包ID,即1,2,3,4

我想创建只接收特定ID的可观察对象。

所以我做了:

Packets.Where(p=>p.Id == 1)

例如

…给我一个IObservable<Packets>,它只给我Id 1的数据包。

我可能有几个:

Packets.Where(p=>p.Id == 2)
Packets.Where(p=>p.Id == 3)
Packets.Where(p=>p.Id == 4)
Packets.Where(p=>p.Id == 5)

这基本上是有效的,但是我想选择的Id越多,需要的处理就越多,即p=>p.Id将为每个Id运行,即使在找到目标Observable之后。

如何使路由更有效,类似于:

字典听众;

listeners.GetValue (packet.Id) .OnDataReceived(包)

这样,当一个id被我的一个IObservables拾取时,其他的都看不到它?

更新

根据Lee Campbell的groupby建议添加了一个扩展:

public static class IObservableExtensions
{
    class RouteTable<TKey, TSource>
    {
        public static readonly ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>> s_routes = new ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>>();
    }
    public static IObservable<TSource> Route<TKey, TSource>(this IObservable<TSource> source, Func<TSource, TKey> selector, TKey id)
    {
        var grouped = RouteTable<TKey, TSource>.s_routes.GetValue(source, s => s.GroupBy(p => selector(p)).Replay().RefCount());
        return grouped.Where(e => e.Key.Equals(id)).SelectMany(e => e);
    }
}

可以这样使用:

Subject<Packet> packetSubject = new Subject<Packet>();
        var packets = packetSubject.AsObservable();
        packets.Route((p) => p.Id, 5).Subscribe((p) =>
        {
            Console.WriteLine("5");
        });
        packets.Route((p) => p.Id, 4).Subscribe((p) =>
        {
            Console.WriteLine("4");
        });
        packets.Route((p) => p.Id, 3).Subscribe((p) =>
        {
            Console.WriteLine("3");
        });
        packetSubject.OnNext(new Packet() { Id = 1 });
        packetSubject.OnNext(new Packet() { Id = 2 });
        packetSubject.OnNext(new Packet() { Id = 3 });
        packetSubject.OnNext(new Packet() { Id = 4 });
        packetSubject.OnNext(new Packet() { Id = 5 });
        packetSubject.OnNext(new Packet() { Id = 4 });
        packetSubject.OnNext(new Packet() { Id = 3 });

输出是:3、4、5、4、3

当它看到一个新的数据包Id时,它只检查每个组的Id。

这是我很久以前写的一个操作符,但我认为它可以满足您的要求。我仍然认为一个简单的.Where可能更好——即使有多个订阅者。

然而,我想为可观察对象设置一个.ToLookup,使其操作符与可枚举对象的操作符相同。

它不是内存高效的,但是它实现了IDisposable,所以它可以在之后被清理。它也不是线程安全的,所以可能需要一点加固。

在这里:

public static class ObservableEx
{
    public static IObservableLookup<K, V> ToLookup<T, K, V>(this IObservable<T> source, Func<T, K> keySelector, Func<T, V> valueSelector, IScheduler scheduler)
    {
        return new ObservableLookup<T, K, V>(source, keySelector, valueSelector, scheduler);
    }
    internal class ObservableLookup<T, K, V> : IDisposable, IObservableLookup<K, V>
    {
        private IDisposable _subscription = null; 
        private readonly Dictionary<K, ReplaySubject<V>> _lookups = new Dictionary<K, ReplaySubject<V>>();
        internal ObservableLookup(IObservable<T> source, Func<T, K> keySelector, Func<T, V> valueSelector, IScheduler scheduler)
        {
            _subscription = source.ObserveOn(scheduler).Subscribe(
                t => this.GetReplaySubject(keySelector(t)).OnNext(valueSelector(t)),
                ex => _lookups.Values.ForEach(rs => rs.OnError(ex)),
                () => _lookups.Values.ForEach(rs => rs.OnCompleted()));
        }
        public void Dispose()
        {
            if (_subscription != null)
            {
                _subscription.Dispose();
                _subscription = null;
                _lookups.Values.ForEach(rs => rs.Dispose());
                _lookups.Clear();
            }
        }
        private ReplaySubject<V> GetReplaySubject(K key)
        {
            if (!_lookups.ContainsKey(key))
            {
                _lookups.Add(key, new ReplaySubject<V>());
            }
            return _lookups[key];
        }
        public IObservable<V> this[K key]
        {
            get
            {
                if (_subscription == null) throw new ObjectDisposedException("ObservableLookup");
                return this.GetReplaySubject(key).AsObservable();
            }
        }
    }
}
public interface IObservableLookup<K, V> : IDisposable
{
    IObservable<V> this[K key] { get; }
}

你可以这样使用:

IObservable<Packets> Packets = ...
IObservableLookup<int, Packets> lookup = Packets.ToLookup(p => p.Id, p => p, Scheduler.Default);
lookup[1].Subscribe(p => { });
lookup[2].Subscribe(p => { });
// etc

这样做的好处是,你可以在之前通过键订阅值,该键的值已经由源可观察对象生成。

不要忘记调用lookup.Dispose()来清理资源。

我建议查看GroupBy,然后检查是否有性能回报。我想有,但它重要吗?

Packets.GroupBy(p=>p.Id)

关于如何使用GroupBy作为路由器类型的示例代码

var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
    ReactiveTest.OnNext(100, 1),
    ReactiveTest.OnNext(200, 2),
    ReactiveTest.OnNext(300, 3),
    ReactiveTest.OnNext(400, 4),
    ReactiveTest.OnNext(500, 5),
    ReactiveTest.OnNext(600, 6),
    ReactiveTest.OnNext(700, 7),
    ReactiveTest.OnNext(800, 8),
    ReactiveTest.OnNext(900, 9),
    ReactiveTest.OnNext(1000, 10),
    ReactiveTest.OnNext(1100, 11)
    );
var router = source.GroupBy(i=>i%4)
    .Publish()
    .RefCount();
var zerosObserver = scheduler.CreateObserver<int>();
router.Where(grp=>grp.Key == 0)
    .Take(1)
    .SelectMany(grp=>grp)
    .Subscribe(zerosObserver);
var onesObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 1)
    .Take(1)
    .SelectMany(grp => grp)
    .Subscribe(onesObserver);
var twosObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 2)
        .Take(1)
        .SelectMany(grp => grp)
        .Subscribe(twosObserver);
var threesObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 3)
        .Take(1)
        .SelectMany(grp => grp)
        .Subscribe(threesObserver);
scheduler.Start();
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(400, 4), ReactiveTest.OnNext(800, 8)}, zerosObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(100, 1), ReactiveTest.OnNext(500, 5), ReactiveTest.OnNext(900, 9)}, onesObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(200, 2), ReactiveTest.OnNext(600, 6), ReactiveTest.OnNext(1000, 10) }, twosObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(300, 3), ReactiveTest.OnNext(700, 7), ReactiveTest.OnNext(1100, 11)}, threesObserver.Messages);

您可以使用GroupBy来分割数据。我建议你先设置好所有的订阅,然后激活你的源。这样做会导致一个巨大的嵌套GroupBy查询,但也可以多cast您的组并单独订阅它们。我在下面写了一个小的帮助工具来完成这个任务。

因为您仍然可能希望在源被激活后添加新的路由(通过Connect完成),我们使用Replay来重播组。Replay也是一个多强制转换操作符,所以我们不需要Publish来进行多强制转换。

public sealed class RouteData<TKey, TSource>
{
    private IConnectableObservable<IGroupedObservable<TKey, TSource>> myRoutes;
    public RouteData(IObservable<TSource> source, Func<TSource, TKey> keySelector)
    {
        this.myRoutes = source.GroupBy(keySelector).Replay();
    }
    public IDisposable Connect()
    {
        return this.myRoutes.Connect();
    }
    public IObservable<TSource> Get(TKey id)
    {
        return myRoutes.FirstAsync(e => e.Key.Equals(id)).Merge();
    }
}
public static class myExtension
{
    public static RouteData<TKey, TSource> RouteData<TKey, TSource>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
    {
        return new RouteData<TKey, TSource>(source, keySelector);
    }
}

使用例子:

public class myPackage
{
    public int Id;
    public myPackage(int id)
    {
        this.Id = id;
    }
}
class program
{
    static void Main()
    {
        var source = new[] { 0, 1, 2, 3, 4, 5, 4, 3 }.ToObservable().Select(i => new myPackage(i));
        var routes = source.RouteData(e => e.Id);
        var subscription = new CompositeDisposable(
            routes.Get(5).Subscribe(Console.WriteLine),
            routes.Get(4).Subscribe(Console.WriteLine),
            routes.Get(3).Subscribe(Console.WriteLine),
            routes.Connect());
        Console.ReadLine();
    }
}

您可能需要考虑编写一个自定义IObserver来执行您的命令。我在下面包含了一个例子。

void Main()
{
    var source = Observable.Range(1, 10);
    var switcher = new Switch<int, int>(i => i % 3);
    switcher[0] = Observer.Create<int>(val => Console.WriteLine($"{val} Divisible by three"));
    source.Subscribe(switcher);
}
class Switch<TKey,TValue> : IObserver<TValue>
{
    private readonly IDictionary<TKey, IObserver<TValue>> cases;
    private readonly Func<TValue,TKey> idExtractor;
    public IObserver<TValue> this[TKey decision]
    {
        get
        {
            return cases[decision];
        }
        set
        {
            cases[decision] = value;
        }
    }
    public Switch(Func<TValue,TKey> idExtractor)
    {
        this.cases = new Dictionary<TKey, IObserver<TValue>>();
        this.idExtractor = idExtractor;
    }
    public void OnNext(TValue next)
    {
        IObserver<TValue> nextCase;
        if (cases.TryGetValue(idExtractor(next), out nextCase))
        {
            nextCase.OnNext(next);
        }
    }
    public void OnError(Exception e)
    {
        foreach (var successor in cases.Values)
        {
            successor.OnError(e);
        }
    }
    public void OnCompleted()
    {
        foreach (var successor in cases.Values)
        {
            successor.OnCompleted();
        }
    }
}

您显然需要实现idExtractor来从数据包中提取id。

相关内容

  • 没有找到相关文章

最新更新