我试图使用Chunkify方法来"捕获"所有"挂起"的项。。但我发现了一个问题,消耗了一个线程的所有资源,有人知道为什么会发生这种情况吗?我该如何防止这种情况发生?
事实上,我的目标是为我的事件创建一个"垃圾邮件过滤器",只选择最后5个值,并且忽略两个以上的连续重复。
问题如何发生的示例:
注意下面的代码既愚蠢又毫无意义。这只是为了演示问题,并表明事件可以被多个线程调用(请运行上面的代码并观察输出窗口,这就是问题所在)。
[TestMethod]
public void ThreadSpinning()
{
var subs = Observable.FromEventPattern(add => this.Raise += add, rem => this.Raise -= rem)
.Select((item, countRaise) => countRaise)
.Chunkify()
.ToObservable(Scheduler.Default)
.Select((countRaise, countChunkify) => new { raiseItems = countRaise, countChunkify })
.Do(obj => Trace.Write("Chunkify = " + obj.countChunkify + " | "))
.Select(a => a.raiseItems)
.Where(a => a.Any())
.Do(obj =>
{
Trace.WriteLine("[ Start do something.. Raise = " + Dump(obj) + " ] " +
Environment.NewLine + Environment.NewLine);
Thread.Sleep(700);
}).Subscribe();
Thread.Sleep(2000);
var handle = new ManualResetEventSlim(false);
ThreadPool.QueueUserWorkItem(r =>
{
Thread.Sleep(500);
Task.Factory.StartNew(() =>
{
OnRaise();
OnRaise();
}).Wait();
OnRaise();
Thread.Sleep(500);
OnRaise();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(1500);
OnRaise();
OnRaise();
Thread.Sleep(500);
OnRaise();
Thread.Sleep(250);
OnRaise();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(500);
Task.Factory.StartNew(OnRaise).Wait();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(2000);
handle.Set();
});
handle.Wait();
Thread.Sleep(3000);
subs.Dispose();
Thread.Sleep(1000);
}
private event EventHandler Raise;
protected virtual void OnRaise()
{
EventHandler handler = Raise;
if (handler != null)
handler(this, EventArgs.Empty);
}
public static string Dump<T>(IEnumerable<T> source)
{
return source.Select(a => a.ToString()).Aggregate((a, b) => a + ", " + b);
}
我不确定你到底想做什么,但你的代码有一些问题:
-
您使用
Chunkify
从IObservable
转换为IEnumerable
,但随后又将其转换回IObservable
,这有点奇怪。 -
报表
.Select((item, count) => new { item, count }) .Do(obj => Trace.Write(obj.count + " | ")) .Select(a => a.item) .Where(a => a.Any()) .Do(obj => Trace.WriteLine("Do something.. " + obj.Dump()))
是许多似乎只是为了调试目的而进行转换的代码。您只需在一个
Do
调用内的一个lambda语句中编写所有调试代码。 -
您不应该创建新的
Random
对象,而应该重用一个对象并对其调用Next()
:http://msdn.microsoft.com/en-us/library/h343ddh9.aspx -
您过度使用
Thread.Sleep
,在可观察的序列中使用它是一种代码气味。尝试将代码转换为使用各种时间运算符,如Throttle
和Delay
。您可能还想使用Observable.Generate
创建序列。 -
有可能
Chunkify
实际上并不是你所认为的——你考虑过Buffer
算子吗?这里有一个很好的时间运算符列表:http://introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html#TimeShiftedSequences -
要测试代码,您不需要实际提升事件处理程序,只需通过生成一个可观察序列并订阅它来测试订阅代码。例如,如果您有一个方法
SubscribeToMyEvent(IObservable<T>)
,那么您可以通过传递用FromEventPattern
创建的可观察序列或用Interval
或Generate
创建的可观测序列来设置它
你的具体情况是什么?是什么触发了这些事件?您究竟是如何尝试更改事件流的?绘制大理石图(例如。http://channel9.msdn.com/blogs/j.van.gogh/reactive-extensions-api-in-depth-marble-diagrams-select--where)以便思考您的算法。