RX - 与超时不同



我想知道,是否有任何方法可以在.NET的反应性扩展中实现不同再次回来。我需要此应用程序中的热源来源,该应用程序将整整一年都停止工作,所以我担心性能,一段时间后我需要允许这些值。也有独特的未解决,但在我的情况下可能会混合使用 - 例如:A x a,独特的untilchanged将为我提供x a,我需要结果x,并且在给定的时间之后,应重置不同的时间。

所接受的答案是有缺陷的;下面显示的缺陷。这是解决方案的演示,并带有测试批次:

TestScheduler ts = new TestScheduler();
var source = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(400.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);
var target = source.TimedDistinct(TimeSpan.FromMilliseconds(300), ts);
var expectedResults = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);
var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);

解决方案包括TimedDistinct的许多过载,允许IScheduler注入以及IEqualityComparer<T>注入,类似于Distinct。忽略所有这些过载后,解决方案都位于辅助方法StateWhere上,这有点像ScanWhere的组合:它像Where一样过滤,但允许您像Scan一样嵌入状态。

public static class RxState
{
    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime)
    {
        return TimedDistinct(source, expirationTime, Scheduler.Default);    
    }
    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IScheduler scheduler)
    {
        return TimedDistinct<TSource>(source, expirationTime, EqualityComparer<TSource>.Default, scheduler);
    }
    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer)
    {
        return TimedDistinct(source, expirationTime, comparer, Scheduler.Default);
    }
    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer, IScheduler scheduler)
    {
        var toReturn = source
            .Timestamp(scheduler)
            .StateWhere(
                new Dictionary<TSource, DateTimeOffset>(comparer),
                (state, item) => item.Value,
                (state, item) => state
                    .Where(kvp => item.Timestamp - kvp.Value < expirationTime)
                    .Concat( 
                        !state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
                            ? Enumerable.Repeat(new KeyValuePair<TSource, DateTimeOffset>(item.Value, item.Timestamp), 1)
                            : Enumerable.Empty<KeyValuePair<TSource, DateTimeOffset>>()
                    )
                    .ToDictionary(kvp => kvp.Key, kvp => kvp.Value, comparer),
                (state, item) => !state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
        );
        return toReturn;
    }
    public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(
            this IObservable<TSource> source,
            TState initialState,
            Func<TState, TSource, IObservable<TResult>> resultSelector,
            Func<TState, TSource, TState> stateSelector
        )
    {
        return source
            .Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
            .SelectMany(t => t.Item2);
    }
    public static IObservable<TResult> StateWhere<TSource, TState, TResult>(
            this IObservable<TSource> source,
            TState initialState,
            Func<TState, TSource, TResult> resultSelector,
            Func<TState, TSource, TState> stateSelector,
            Func<TState, TSource, bool> filter
        )
    {
        return source
            .StateSelectMany(initialState, (state, item) =>
                    filter(state, item) ? Observable.Return(resultSelector(state, item)) : Observable.Empty<TResult>(),
                stateSelector);
    }
}

接受的答案有两个缺陷:

  1. 它不接受IScheduler注入,这意味着在RX测试框架中很难测试。这很容易解决。
  2. 它依赖于可变状态,在像RX这样的多线程框架中效果不佳。

第2期对多个订户很明显:

var observable = Observable.Range(0, 5)
    .DistinctFor(TimeSpan.MaxValue)
    ;
observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));

遵循常规RX行为后的输出应两次输出0-4。相反,仅输出一次0-4。

这是另一个示例缺陷:

var subject = new Subject<int>();
var observable = subject
    .DistinctFor(TimeSpan.MaxValue);
observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);

这一次输出1 2 3,而不是两次。


这是MsTicks的代码:

public static class RxTestingHelpers
{
    public static long MsTicks(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

带有时间戳的包装类,但不考虑哈希或平等的时间戳(created字段):

public class DistinctForItem<T> : IEquatable<DistinctForItem<T>>
{
    private readonly T item;
    private DateTime created;
    public DistinctForItem(T item)
    {
        this.item = item;
        this.created = DateTime.UtcNow;
    }
    public T Item
    {
        get { return item; }
    }
    public DateTime Created
    {
        get { return created; }
    }
    public bool Equals(DistinctForItem<T> other)
    {
        if (ReferenceEquals(null, other)) return false;
        if (ReferenceEquals(this, other)) return true;
        return EqualityComparer<T>.Default.Equals(Item, other.Item);
    }
    public override bool Equals(object obj)
    {
        if (ReferenceEquals(null, obj)) return false;
        if (ReferenceEquals(this, obj)) return true;
        if (obj.GetType() != this.GetType()) return false;
        return Equals((DistinctForItem<T>)obj);
    }
    public override int GetHashCode()
    {
        return EqualityComparer<T>.Default.GetHashCode(Item);
    }
    public static bool operator ==(DistinctForItem<T> left, DistinctForItem<T> right)
    {
        return Equals(left, right);
    }
    public static bool operator !=(DistinctForItem<T> left, DistinctForItem<T> right)
    {
        return !Equals(left, right);
    }
}

现在可以编写DistinctFor扩展方法:

public static IObservable<T> DistinctFor<T>(this IObservable<T> src, 
                                            TimeSpan validityPeriod)
{
    //if HashSet<DistinctForItem<T>> actually allowed us the get at the 
    //items it contains it would be a better choice. 
    //However it doesn't, so we resort to 
    //Dictionary<DistinctForItem<T>, DistinctForItem<T>> instead.
    var hs = new Dictionary<DistinctForItem<T>, DistinctForItem<T>>();
    return src.Select(item => new DistinctForItem<T>(item)).Where(df =>
    {
        DistinctForItem<T> hsVal;
        if (hs.TryGetValue(df, out hsVal))
        {
            var age = DateTime.UtcNow - hsVal.Created;
            if (age < validityPeriod)
            {
                return false;
            }
        }
        hs[df] = df;
        return true;
    }).Select(df => df.Item);
}

可以使用的:

Enumerable.Range(0, 1000)
    .Select(i => i % 3)
    .ToObservable()
    .Pace(TimeSpan.FromMilliseconds(500)) //drip feeds the observable
    .DistinctFor(TimeSpan.FromSeconds(5))
    .Subscribe(x => Console.WriteLine(x));

供参考,这是我对Pace<T>的实现:

public static IObservable<T> Pace<T>(this IObservable<T> src, TimeSpan delay)
{
    var timer = Observable
        .Timer(
            TimeSpan.FromSeconds(0),
            delay
        );
    return src.Zip(timer, (s, t) => s);
}

相关内容

  • 没有找到相关文章

最新更新