强制刷新计数类型“可观察”.缓冲区 c#



基于这个问题,讨论刷新基于时间的缓冲区:强制刷新到 Observable.Buffer c#,我很难弄清楚如何将那里给出的这个答案转换为我按计数而不是按时间缓冲的情况:

var subject = new Subject<Unit>();
var closing = Observable
    .Timer(new TimeSpan(0, 0, 1, 30))
    .Select(x => Unit.Default);
var query =
    mFluxObservable
        .Buffer(() => Observable
            .Amb(subject, closing)
            .Take(1));

我开始使用相同的Amb逻辑,使用"项目计数器"而不是计时器,但发现自己陷入了试图弄清楚如何重置它的兔子洞。

你能轻轻地把我推向如何实现我缺失的功能的方向吗?

var flusher = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(0.1));
var output = source.BufferExceptOnFlush(100, flusher);

我的消息来源是"热",如果有帮助的话......

PS:我可以使用Observable.Create和某种内部计数器来弄清楚一些事情,但并非没有锁定......

我认为您可以通过在关闭可观察量中使用源代码并将其与刷新可观察量合并来做到这一点。以下内容对我有用:

 var source = new Subject<Unit>();
 var flush = new Subject<Unit>();
 // close buffer every 3 values or when a flush value arrives
 var closing = source.Buffer(3) 
            .Select(x => Unit.Default)
            .Merge(flush);
 var query = source.Buffer(() => closing)
         .Subscribe(Console.WriteLine);
// some test values
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
// flush buffer
flush.OnNext(Unit.Default);

我认为Observable.Create<T>解决方案没有错。在这种情况下,我认为这个扩展应该可以工作

public static IObservable<IList<T>> BufferExceptOnFlush<T>(this IObservable<T> source,IObservable<Unit> flusher, int bufferSize)
{
 return Observable.Create<IList<T>>(observer =>
 {
     var shared = source.Publish();
     var closing = shared.Buffer(bufferSize).Select(x => Unit.Default);
     var query = shared.Buffer(() => flusher.Amb(closing).Take(1)).SubscribeSafe(observer);
     return new CompositeDisposable(query, shared.Connect());
 });

我还没有测试过它,但会启用这样的用法

var query = myFluxObservable.BufferExceptOnFlush(myFlusher, 5);

它适用于热可观察量和冷可观察量,因为没有双重订阅

这是我到目前为止得到的:

var flush = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => Unit.Default).Publish().RefCount();
var closer = CloseGenerator(source, flush, 5);
source.Buffer(closer)
//...
private IObservable<Unit> CloseGenerator<T>(IObservable<T> source, 
                                             IObservable<Unit> flusher, int count)
{
     return Observable.CombineLatest(
                        source.Select((_, i) => i), 
                        flusher.Select((_, i) => i).StartWith(-1))
             .Select(ar => Tuple.Create(ar[0], ar[1]))
             .Scan(Tuple.Create(-1, -1), (prev, next) =>
                 {
                     if(next.Item2 != prev.Item2 || next.Item1 == prev.Item1 + count)
                         return next;
                     else
                         return prev;
                 }
             )
             .DistinctUntilChanged().Skip(1) //This is 'DistinctExceptFirst'
             .Select(_ => Unit.Default);
}

相关内容

  • 没有找到相关文章