随机超时(时间跨度)在 Rx 中不起作用



之前的帖子似乎不是很清楚,所以经过一些测试,我重新打开了这个帖子,更简化的文字,希望有人能帮助。

我的单例可观察对象从多个 I/O事件源转变为,意味着它们同时在底层被提升,基于测试(为了证明Rx不是线程安全的)和Rx设计准则,我使其序列化,见lock(...):

public class EventFireCenter
{
    public static event EventHandler<GTCommandTerminalEventArg> OnTerminalEventArrived;
    private static object syncObject = new object();
    public static void TestFireDummyEventWithId(int id)
    {
        lock (syncObject)
        {
            var safe = OnTerminalEventArrived;
            if (safe != null)
            {
                safe(null, new GTCommandTerminalEventArg(id));
            }
        }
    }
}

这是一个单例Observable:

public class UnsolicitedEventCenter
{
    private readonly static IObservable<int> publisher;
    static UnsolicitedEventCenter()
    {
        publisher = Observable.FromEventPattern<GTCommandTerminalEventArg>(typeof(EventFireCenter), "OnTerminalEventArrived")
            .Select(s => s.EventArgs.Id);
    }
    private UnsolicitedEventCenter() { }
    /// <summary>
    /// Gets the Publisher property to start observe an observable sequence. 
    /// </summary>
    public static IObservable<int> Publisher { get { return publisher; } }
}

Subscribe(...)的场景可以用下面的代码描述,您可以看到Subscribe(...)可以在不同的线程中并发调用:

    for (var i = 0; i < concurrentCount; i++)
    {
        var safe = i;
        Scheduler.Default.Schedule(() =>
        {
            IDisposable dsp = null;
            dsp = UnsolicitedEventCenter.Publisher
                .Timeout(TimeSpan.FromMilliseconds(8000))
                .Where(incomingValue => incomingValue == safe)
                .ObserveOn(Scheduler.Default)
                //.Take(1)
                .Subscribe((incomingEvent) =>
                {
                    Interlocked.Increment(ref onNextCalledTimes);
                    dsp.Dispose();
                }
                , ex =>
                {
                    Interlocked.Increment(ref timeoutExceptionOccurredTimes);
                    lock (timedOutEventIds)
                    {
                        // mark this id has been timed out, only for unit testing result check.
                        timedOutEventIds.Add(safe);
                    }
                    dsp.Dispose();
                });
            Interlocked.Increment(ref threadPoolQueuedTaskCount);
        });
    }

正如经验丰富的人多次指出的那样,不建议在OnNext(...)中调用Dispose(),但是我们在这里忽略它,因为代码来自生产。

现在的问题是随机.Timeout(TimeSpan.FromMilliseconds(8000))不工作,ex从未被调用,任何人都可以看到代码中的任何异常?

对于测试,我设置了压力测试,但到目前为止,我没有复制它,而在生产中,它每天出现几次。为了以防万一,我粘贴了所有的测试代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Rx
{
    class Program
    {
        static void Main(string[] args)
        {
            // avoid thread creation delay in thread pool.
            ThreadPool.SetMinThreads(200, 50);
            // let the test run for 100 times
            for (int t = 0; t < 100; t++)
            {
                Console.WriteLine("");
                Console.WriteLine("======Current running times: " + t);
                // at meantime, 150 XXX.Subscribe(...) will be called.
                const int concurrentCount = 150;
                // how many fake event will be fire to santisfy that 150 XXX.Subscribe(...).   
                const int fireFakeEventCount = 40;
                int timeoutExceptionOccurredTimes = 0;
                var timedOutEventIds = new List<int>();
                int onNextCalledTimes = 0;
                int threadPoolQueuedTaskCount = 0;
                for (var i = 0; i < concurrentCount; i++)
                {
                    var safe = i;
                    Scheduler.Default.Schedule(() =>
                    {
                        IDisposable dsp = null;
                        dsp = UnsolicitedEventCenter.Publisher
                            .Timeout(TimeSpan.FromMilliseconds(8000))
                            .Where(incomingValue => incomingValue == safe)
                            .ObserveOn(Scheduler.Default)
                            //.Take(1)
                            .Subscribe((incomingEvent) =>
                            {
                                Interlocked.Increment(ref onNextCalledTimes);
                                dsp.Dispose();
                            }
                            , ex =>
                            {
                                Interlocked.Increment(ref timeoutExceptionOccurredTimes);
                                lock (timedOutEventIds)
                                {
                                    // mark this id has been timed out, only for unit testing result check.
                                    timedOutEventIds.Add(safe);
                                }
                                dsp.Dispose();
                            });
                        Interlocked.Increment(ref threadPoolQueuedTaskCount);
                    });
                }
                Console.WriteLine("Starting fire event: " + DateTime.Now.ToString("HH:mm:ss.ffff"));
                int threadPoolQueuedTaskCount1 = 0;
                // simulate a concurrent event fire
                for (int i = 0; i < fireFakeEventCount; i++)
                {
                    var safe = i;
                    Scheduler.Default.Schedule(() =>
                    {
                        EventFireCenter.TestFireDummyEventWithId(safe);
                        Interlocked.Increment(ref threadPoolQueuedTaskCount1);
                    });
                }
                // make sure all proceeding task has been done in threadPool.
                while (threadPoolQueuedTaskCount < concurrentCount)
                {
                    Thread.Sleep(1000);
                }
                // make sure all proceeding task has been done in threadPool.
                while (threadPoolQueuedTaskCount1 < fireFakeEventCount)
                {
                    Thread.Sleep(100);
                }

                Console.WriteLine("Finished fire event: " + DateTime.Now.ToString("HH:mm:ss.ffff"));
                // sleep a time which >3000ms.
                Thread.Sleep(8000);
                Console.WriteLine("timeoutExceptionOccurredTimes: " + timeoutExceptionOccurredTimes);
                Console.WriteLine("onNextCalledTimes: " + onNextCalledTimes);
                if ((concurrentCount - fireFakeEventCount) != timeoutExceptionOccurredTimes)
                {
                    try
                    {
                        Console.WriteLine("Non timeout fired for these ids: " +
                           Enumerable.Range(0, concurrentCount)
                               .Except(timedOutEventIds).Except(Enumerable.Range(0, fireFakeEventCount)).Select(i => i.ToString())
                               .Aggregate((acc, n) => acc + "," + n));
                    }
                    catch (Exception ex) { Console.WriteLine("faild to output timedout ids..."); }
                    break;
                }
                if (fireFakeEventCount != onNextCalledTimes)
                {
                    Console.WriteLine("onNextOccurredTimes assert failed");
                    break;
                }
                if ((concurrentCount - fireFakeEventCount) != timeoutExceptionOccurredTimes)
                {
                    Console.WriteLine("timeoutExceptionOccurredTimes assert failed");
                    break;
                }
            }
            Console.WriteLine("");
            Console.WriteLine("");
            Console.WriteLine("DONE!");
            Console.ReadLine();
        }
    }
    public class EventFireCenter
    {
        public static event EventHandler<GTCommandTerminalEventArg> OnTerminalEventArrived;
        private static object syncObject = new object();
        public static void TestFireDummyEventWithId(int id)
        {
            lock (syncObject)
            {
                var safe = OnTerminalEventArrived;
                if (safe != null)
                {
                    safe(null, new GTCommandTerminalEventArg(id));
                }
            }
        }
    }
    public class UnsolicitedEventCenter
    {
        private readonly static IObservable<int> publisher;
        static UnsolicitedEventCenter()
        {
            publisher = Observable.FromEventPattern<GTCommandTerminalEventArg>(typeof(EventFireCenter), "OnTerminalEventArrived")
                .Select(s => s.EventArgs.Id);
        }
        private UnsolicitedEventCenter() { }
        /// <summary>
        /// Gets the Publisher property to start observe an observable sequence. 
        /// </summary>
        public static IObservable<int> Publisher { get { return publisher; } }
    }
    public class GTCommandTerminalEventArg : System.EventArgs
    {
        public GTCommandTerminalEventArg(int id)
        {
            this.Id = id;
        }
        public int Id { get; private set; }
    }
}

很可能Timeout没有触发,因为它Where过滤器之前。这意味着所有事件都通过并重置计时器,然后大多数事件被Where子句过滤。对于订阅的观察者来说,它似乎永远不会得到结果,也永远不会触发超时。将Timeout移动到Where之后,您现在应该有一个系统,如果单个观察者没有及时获得预期的事件,则会超时。

相关内容

  • 没有找到相关文章

最新更新