将一个可观察对象转换为具有不同发射间隔的另一个可观察对象



我有一个热的可观察量,它以随机的间隔发出,就像不同的数字一样。

1-

-1-----1--2-4

我正在寻找一种方法,当在预定义的间隔内存在重复项时,将该项目合并回序列,直到它找到绕过间隔阈值的空间。我已经实现了一个我认为不是最佳的解决方案,因为当我在生产中使用真实对象而不是整数进行测试时,它会在系统中产生一种背压,我看到 CPU 发疯了。以下是我到目前为止的测试。

using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Microsoft.Reactive.Testing;
using Xunit;
using Xunit.Abstractions;
namespace Specs{
    public  class CollectDuplicatesSpecs:ReactiveTest{
        private readonly ITestOutputHelper _outputHelper;
        public CollectDuplicatesSpecs(ITestOutputHelper outputHelper){
            _outputHelper = outputHelper;
        }
        [Fact]
        public void MethodName(){
            var testScheduler = new TestScheduler();
            var hotObservable = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 1), OnNext(30, 1),OnNext(40, 1));
            var subject = new Subject<int>();
            hotObservable.Merge(subject).Window(TimeSpan.FromTicks(20), testScheduler).Select(observable => {
                observable.CollectDuplicates(i => i).Delay(TimeSpan.FromTicks(1), testScheduler).Subscribe(subject);
                return observable.Distinct();
            }).SelectMany(observable => observable).Subscribe(i => _outputHelper.WriteLine($"{testScheduler.Clock}-{i}"));
            testScheduler.AdvanceBy(160);
        }
    }
    public static class RxEx{
        public static IObservable<TSource> CollectDuplicates<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector = null) {
            return Observable.Create<TSource>(observer => {
                var dubplicateCollector = new DubplicateCollector<TSource>(keySelector);
                var duplicateCollectorSubscription = dubplicateCollector.Matches.Subscribe(observer);
                var disposable = source.Distinct(dubplicateCollector).Finally(dubplicateCollector.Dispose).Subscribe();
                return new CompositeDisposable(disposable, duplicateCollectorSubscription, dubplicateCollector);
            });
        }
    }
    public class DubplicateCollector<TSource> : IEqualityComparer<TSource>,IDisposable {
        private readonly Func<TSource, int> _keySelector;
        readonly Subject<TSource> _matches = new Subject<TSource>();
        public DubplicateCollector(Func<TSource, int> keySelector) {
            _keySelector = keySelector;
        }
        public IObservable<TSource> Matches => _matches;
        public bool Equals(TSource x, TSource y) {
            var equals = IsMatch(x, y);
            if (equals)
                _matches.OnNext(x);
            return equals;
        }
        private bool IsMatch(TSource x, TSource y) {
            if (_keySelector != null)
                return _keySelector(x).Equals(_keySelector(y));
            var equals = x != null && x.Equals(y);
            return equals;
        }
        public int GetHashCode(TSource obj) {
            return _keySelector(obj);
        }
        public void Dispose(){
            _matches?.Dispose();
        }
    }

}

哪些打印

10-1
21-1
40-1
60-1

我正在努力得到你想要的东西:一些大理石图可能会有所帮助。我假设你本质上想要一个平滑运算符之类的东西:如果消息是突发的,那么随着时间的推移,它们会以某种方式平滑它们。

根据此答案,您可以创建一个处理平滑的运算符:

public static class ObservableDrainExtensions
{
    public static IObservable<T> TimeDrained<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
    {
        return source.Drain(x => Observable.Empty<T>().Delay(ts, scheduler).StartWith(x));
    }
    public static IObservable<T> TimeDrained<T>(this IObservable<T> source, TimeSpan ts)
    {
        return TimeDrained(source, ts, Scheduler.Default);
    }
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

排水可以线性地平滑事情,TimeDrained基于时间跨度这样做。您可以将其与GroupBy结合使用以向其添加不同的元素:

[Fact]
public void MethodName()
{
    var testScheduler = new TestScheduler();
    var hotObservable = testScheduler.CreateHotObservable(
        OnNext(10, 1), 
        OnNext(20, 1), 
        OnNext(30, 1), 
        OnNext(40, 1)
    );
    var ts = TimeSpan.FromTicks(20);
    hotObservable
        .GroupBy(i => i)            //comparison key
        .Select(g => g.TimeDrained(ts, testScheduler))
        .Merge()
        .Subscribe(i => Console.WriteLine($"{testScheduler.Clock}-{i}"));
    testScheduler.AdvanceBy(160);
}

输出为:

10-1
30-1
50-1
70-1

如果这不是您要找的,那么请澄清问题。

相关内容

  • 没有找到相关文章

最新更新