以最大速率处理请求



我使用Rx来确保我们的后端遵守一些第三方API的请求限制。

下面的实现使用简单的Subject<T>作为输入队列,然后使用James World的自定义Pace操作符驯服

这是可行的,但前提是throttledRequests不在主线程上被ObserveOn(TaskPoolScheduler.Default)强制执行。

一旦我注释掉这一行(第61行),程序的行为就好像根本没有使用Pace操作符一样,并且请求再次得到处理的速度与它们排队的速度一样快。有人能解释这种行为吗?

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
    public static class ObservableExtensions
    {
        /// <summary>
        /// James World's Pace operater (see https://stackoverflow.com/a/21589238/88513)
        /// </summary>
        public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
        {
            return source.Select(i => Observable.Empty<T>()
                    .Delay(interval)
                    .StartWith(i))
                .Concat();
        }
    }
    class Program
    {
        ISubject<int> requests;
        IObservable<int> throttledRequests;
        private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
        {
            var task = throttledRequests
                .Where(x => x == work)
                .Take(1)
                .SelectMany(doWork)
                .ToTask();
            // queue it
            requests.OnNext(work);
            return task;
        }
        private Task<int> DoRequest(int x)
        {
            Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
            return Task.FromResult(x);
        }
        private void Run()
        {
            // initialize request queue
            requests = new Subject<int>();
            // create a derived rate-limited queue
            throttledRequests = requests
                .Pace(TimeSpan.FromMilliseconds(1000))
                .Publish()
                .RefCount()
                .ObserveOn(TaskPoolScheduler.Default);
            Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);
            int i = 0;
            while (true)
            {
                // Queue a number of requests
                var tasks = Enumerable.Range(i * 10, 10)
                    .Select(x => QueueRequest(x, DoRequest))
                    .ToArray();
                Task.WaitAll(tasks);
                Console.ReadLine();
                i++;
            }
        }
        static void Main(string[] args)
        {
            new Program().Run();
        }
    }
}

我不能完整地回答这个问题(不确定为什么它运行在ThreadPoolScheduler上),但我会给你我的想法,并展示如何修复它,使其在有或没有ThreadPoolScheduler的情况下按预期运行。

首先,你可能会注意到,即使在ThreadPoolScheduler上,它也不能正常工作-通常前1-3项被处理而没有任何延迟。为什么在那之后,他们开始处理延迟,但我仍然不清楚。现在说说原因。考虑以下示例代码:

var result = Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(10)).StartWith(1).Take(1).ToTask().Result;

在这里,不会有延误,任务会立即完成。为什么?因为StartWith会立即在序列的开始处注入"1",然后Take(1)获取这个值并完成——没有理由继续执行序列,所以永远不会执行delay。例如,如果你使用Take(2),它将在完成前延迟10秒。

出于完全相同的原因,您的代码永远不会进入延迟状态(例如,您可以通过在delay之后选择调试器并登录到控制台来验证这一点)。要修复,只需删除Take(1)(或更改为Take(2)为例)-无论如何,每个键总是只有一个项目。当您这样做时,无论是否使用ThreadPoolScheduler,代码都将正确运行。

相关内容

  • 没有找到相关文章