RX:如何处理序列中的 n 个缓冲项,然后等待 t 秒,然后再处理接下来的 n 个项



我正在尝试弄清楚如何处理序列中的 n 个缓冲项目,然后在处理接下来的 n 个项目之前等待 t 秒?

这是我尝试做的一个粗略形式,使用 Thread.Sleep()。 我想避免 Thread.Sleep() 并正确地做到这一点。

static void Main(string[] args)
{
    var t = Observable.Range(0, 100000);
    var query = t.Buffer(20);                       
    query.ObserveOn(NewThreadScheduler.Default)
        .Subscribe(x => DoStuff(x));
    Console.WriteLine("Press ENTER to exit");
    Console.ReadLine();
}
static void DoStuff(IList<int> list)
{
    Console.WriteLine(DateTime.Now);
    foreach (var value in list)
    {
        Console.WriteLine(value);
    }
    Thread.Sleep(TimeSpan.FromSeconds(10));
}

谁能帮我找到一种更 RX 的方式来做到这一点?

谢谢

闪光

// Instantiate this once, we'll use it in a closure multiple times.
var delay = Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(10));
// start with a source of individual items to be worked.
Observable.Range(0, 100000)
    // Create batches of work.
    .Buffer(20)
    // Select an observable for the batch of work, and concat a delay.
    .Select(batch => batch.ToObservable().Concat(delay))
    // Concat those together and form a "process, delay, repeat" observable.
    .Concat()
    // Subscribe!
    .Subscribe(Console.WriteLine);
// Make sure we wait for our work to be done.
// There are other ways to sync up, like async / await.
Console.ReadLine();

或者,您也可以使用 async/await 进行同步:

static IObservable<int> delay = Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(100));
static async Task Run()
{
    await Observable.Range(0, 1000)
        .Buffer(20)
        .Select(batch => batch.ToObservable().Concat(delay))
        .Concat()
        .Do(Console.WriteLine)
        .LastOrDefaultAsync();
}

这难道不是一个delay俏皮的把戏吗?它之所以有效,是因为 OnDone 就像 OnNext 一样延迟!

建立在克里斯托弗的回答之上,如果你不想把列表元素弄平,你可以这样做:

var delay = Observable.Empty<IList<int>>().Delay(TimeSpan.FromSeconds(10));
var query = Observable.Range(0, 100000)
                      .Buffer(20)
                      .Select(batch => Observable.Return(batch).Concat(delay))
                      .Concat();
query.Subscribe(list =>
                    {
                        Console.WriteLine(DateTime.Now);
                        foreach (var value in list)
                        {
                            Console.WriteLine(value);
                        }
                    });

相关内容

  • 没有找到相关文章

最新更新