我有一个热的可观察量,它以随机的间隔发出,就像不同的数字一样。
1--1-----1--2-4
我正在寻找一种方法,当在预定义的间隔内存在重复项时,将该项目合并回序列,直到它找到绕过间隔阈值的空间。我已经实现了一个我认为不是最佳的解决方案,因为当我在生产中使用真实对象而不是整数进行测试时,它会在系统中产生一种背压,我看到 CPU 发疯了。以下是我到目前为止的测试。
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Microsoft.Reactive.Testing;
using Xunit;
using Xunit.Abstractions;
namespace Specs{
public class CollectDuplicatesSpecs:ReactiveTest{
private readonly ITestOutputHelper _outputHelper;
public CollectDuplicatesSpecs(ITestOutputHelper outputHelper){
_outputHelper = outputHelper;
}
[Fact]
public void MethodName(){
var testScheduler = new TestScheduler();
var hotObservable = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 1), OnNext(30, 1),OnNext(40, 1));
var subject = new Subject<int>();
hotObservable.Merge(subject).Window(TimeSpan.FromTicks(20), testScheduler).Select(observable => {
observable.CollectDuplicates(i => i).Delay(TimeSpan.FromTicks(1), testScheduler).Subscribe(subject);
return observable.Distinct();
}).SelectMany(observable => observable).Subscribe(i => _outputHelper.WriteLine($"{testScheduler.Clock}-{i}"));
testScheduler.AdvanceBy(160);
}
}
public static class RxEx{
public static IObservable<TSource> CollectDuplicates<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector = null) {
return Observable.Create<TSource>(observer => {
var dubplicateCollector = new DubplicateCollector<TSource>(keySelector);
var duplicateCollectorSubscription = dubplicateCollector.Matches.Subscribe(observer);
var disposable = source.Distinct(dubplicateCollector).Finally(dubplicateCollector.Dispose).Subscribe();
return new CompositeDisposable(disposable, duplicateCollectorSubscription, dubplicateCollector);
});
}
}
public class DubplicateCollector<TSource> : IEqualityComparer<TSource>,IDisposable {
private readonly Func<TSource, int> _keySelector;
readonly Subject<TSource> _matches = new Subject<TSource>();
public DubplicateCollector(Func<TSource, int> keySelector) {
_keySelector = keySelector;
}
public IObservable<TSource> Matches => _matches;
public bool Equals(TSource x, TSource y) {
var equals = IsMatch(x, y);
if (equals)
_matches.OnNext(x);
return equals;
}
private bool IsMatch(TSource x, TSource y) {
if (_keySelector != null)
return _keySelector(x).Equals(_keySelector(y));
var equals = x != null && x.Equals(y);
return equals;
}
public int GetHashCode(TSource obj) {
return _keySelector(obj);
}
public void Dispose(){
_matches?.Dispose();
}
}
}
哪些打印
10-1
21-1
40-1
60-1
我正在努力得到你想要的东西:一些大理石图可能会有所帮助。我假设你本质上想要一个平滑运算符之类的东西:如果消息是突发的,那么随着时间的推移,它们会以某种方式平滑它们。
根据此答案,您可以创建一个处理平滑的运算符:
public static class ObservableDrainExtensions
{
public static IObservable<T> TimeDrained<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
return source.Drain(x => Observable.Empty<T>().Delay(ts, scheduler).StartWith(x));
}
public static IObservable<T> TimeDrained<T>(this IObservable<T> source, TimeSpan ts)
{
return TimeDrained(source, ts, Scheduler.Default);
}
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}
排水可以线性地平滑事情,TimeDrained
基于时间跨度这样做。您可以将其与GroupBy
结合使用以向其添加不同的元素:
[Fact]
public void MethodName()
{
var testScheduler = new TestScheduler();
var hotObservable = testScheduler.CreateHotObservable(
OnNext(10, 1),
OnNext(20, 1),
OnNext(30, 1),
OnNext(40, 1)
);
var ts = TimeSpan.FromTicks(20);
hotObservable
.GroupBy(i => i) //comparison key
.Select(g => g.TimeDrained(ts, testScheduler))
.Merge()
.Subscribe(i => Console.WriteLine($"{testScheduler.Clock}-{i}"));
testScheduler.AdvanceBy(160);
}
输出为:
10-1
30-1
50-1
70-1
如果这不是您要找的,那么请澄清问题。