Chunkify纺纱拉入Rx



我试图使用Chunkify方法来"捕获"所有"挂起"的项。。但我发现了一个问题,消耗了一个线程的所有资源,有人知道为什么会发生这种情况吗?我该如何防止这种情况发生?

事实上,我的目标是为我的事件创建一个"垃圾邮件过滤器",只选择最后5个值​​,并且忽略两个以上的连续重复。

问题如何发生的示例:

注意下面的代码既愚蠢又毫无意义。这只是为了演示问题,并表明事件可以被多个线程调用(请运行上面的代码并观察输出窗口,这就是问题所在)。

    [TestMethod]
    public void ThreadSpinning()
    {
     var subs = Observable.FromEventPattern(add => this.Raise += add, rem => this.Raise -= rem)
                           .Select((item, countRaise) => countRaise)
                           .Chunkify()
                           .ToObservable(Scheduler.Default)
                           .Select((countRaise, countChunkify) => new { raiseItems = countRaise, countChunkify })
                           .Do(obj => Trace.Write("Chunkify = " + obj.countChunkify + " | "))
                           .Select(a => a.raiseItems)
                           .Where(a => a.Any())
                           .Do(obj =>
                           {
                               Trace.WriteLine("[ Start do something.. Raise = " + Dump(obj) + " ] " +
                                               Environment.NewLine + Environment.NewLine);
                               Thread.Sleep(700);
                           }).Subscribe();
        Thread.Sleep(2000);
        var handle = new ManualResetEventSlim(false);
        ThreadPool.QueueUserWorkItem(r =>
            {
                Thread.Sleep(500);
                Task.Factory.StartNew(() =>
                {
                    OnRaise();
                    OnRaise();
                }).Wait();
                OnRaise();
                Thread.Sleep(500);
                OnRaise();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(1500);
                OnRaise();
                OnRaise();
                Thread.Sleep(500);
                OnRaise();
                Thread.Sleep(250);
                OnRaise();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(500);
                Task.Factory.StartNew(OnRaise).Wait();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(2000);
                handle.Set();
            });
        handle.Wait();
        Thread.Sleep(3000); 
        subs.Dispose();
        Thread.Sleep(1000); 
    }
    private event EventHandler Raise;
    protected virtual void OnRaise()
    {
        EventHandler handler = Raise;
        if (handler != null) 
            handler(this, EventArgs.Empty);
    }
    public static string Dump<T>(IEnumerable<T> source)
    {
        return source.Select(a => a.ToString()).Aggregate((a, b) => a + ", " + b);
    }

我不确定你到底想做什么,但你的代码有一些问题:

  • 您使用ChunkifyIObservable转换为IEnumerable,但随后又将其转换回IObservable,这有点奇怪。

  • 报表

              .Select((item, count) => new { item, count })
              .Do(obj => Trace.Write(obj.count + " | "))
              .Select(a => a.item)
              .Where(a => a.Any())
              .Do(obj => Trace.WriteLine("Do something.. " + obj.Dump()))
    

    是许多似乎只是为了调试目的而进行转换的代码。您只需在一个Do调用内的一个lambda语句中编写所有调试代码。

  • 您不应该创建新的Random对象,而应该重用一个对象并对其调用Next():http://msdn.microsoft.com/en-us/library/h343ddh9.aspx

  • 您过度使用Thread.Sleep,在可观察的序列中使用它是一种代码气味。尝试将代码转换为使用各种时间运算符,如ThrottleDelay。您可能还想使用Observable.Generate创建序列。

  • 有可能Chunkify实际上并不是你所认为的——你考虑过Buffer算子吗?这里有一个很好的时间运算符列表:http://introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html#TimeShiftedSequences

  • 要测试代码,您不需要实际提升事件处理程序,只需通过生成一个可观察序列并订阅它来测试订阅代码。例如,如果您有一个方法SubscribeToMyEvent(IObservable<T>),那么您可以通过传递用FromEventPattern创建的可观察序列或用IntervalGenerate 创建的可观测序列来设置它

你的具体情况是什么?是什么触发了这些事件?您究竟是如何尝试更改事件流的?绘制大理石图(例如。http://channel9.msdn.com/blogs/j.van.gogh/reactive-extensions-api-in-depth-marble-diagrams-select--where)以便思考您的算法。

相关内容

  • 没有找到相关文章

最新更新