之前的帖子似乎不是很清楚,所以经过一些测试,我重新打开了这个帖子,更简化的文字,希望有人能帮助。
我的单例可观察对象从多个 I/O事件源转变为,意味着它们同时在底层被提升,基于测试(为了证明Rx不是线程安全的)和Rx设计准则,我使其序列化,见lock(...)
:
public class EventFireCenter
{
public static event EventHandler<GTCommandTerminalEventArg> OnTerminalEventArrived;
private static object syncObject = new object();
public static void TestFireDummyEventWithId(int id)
{
lock (syncObject)
{
var safe = OnTerminalEventArrived;
if (safe != null)
{
safe(null, new GTCommandTerminalEventArg(id));
}
}
}
}
这是一个单例Observable:
public class UnsolicitedEventCenter
{
private readonly static IObservable<int> publisher;
static UnsolicitedEventCenter()
{
publisher = Observable.FromEventPattern<GTCommandTerminalEventArg>(typeof(EventFireCenter), "OnTerminalEventArrived")
.Select(s => s.EventArgs.Id);
}
private UnsolicitedEventCenter() { }
/// <summary>
/// Gets the Publisher property to start observe an observable sequence.
/// </summary>
public static IObservable<int> Publisher { get { return publisher; } }
}
Subscribe(...)
的场景可以用下面的代码描述,您可以看到Subscribe(...)
可以在不同的线程中并发调用:
for (var i = 0; i < concurrentCount; i++)
{
var safe = i;
Scheduler.Default.Schedule(() =>
{
IDisposable dsp = null;
dsp = UnsolicitedEventCenter.Publisher
.Timeout(TimeSpan.FromMilliseconds(8000))
.Where(incomingValue => incomingValue == safe)
.ObserveOn(Scheduler.Default)
//.Take(1)
.Subscribe((incomingEvent) =>
{
Interlocked.Increment(ref onNextCalledTimes);
dsp.Dispose();
}
, ex =>
{
Interlocked.Increment(ref timeoutExceptionOccurredTimes);
lock (timedOutEventIds)
{
// mark this id has been timed out, only for unit testing result check.
timedOutEventIds.Add(safe);
}
dsp.Dispose();
});
Interlocked.Increment(ref threadPoolQueuedTaskCount);
});
}
正如经验丰富的人多次指出的那样,不建议在OnNext(...)
中调用Dispose()
,但是我们在这里忽略它,因为代码来自生产。
现在的问题是随机.Timeout(TimeSpan.FromMilliseconds(8000))
不工作,ex
从未被调用,任何人都可以看到代码中的任何异常?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Rx
{
class Program
{
static void Main(string[] args)
{
// avoid thread creation delay in thread pool.
ThreadPool.SetMinThreads(200, 50);
// let the test run for 100 times
for (int t = 0; t < 100; t++)
{
Console.WriteLine("");
Console.WriteLine("======Current running times: " + t);
// at meantime, 150 XXX.Subscribe(...) will be called.
const int concurrentCount = 150;
// how many fake event will be fire to santisfy that 150 XXX.Subscribe(...).
const int fireFakeEventCount = 40;
int timeoutExceptionOccurredTimes = 0;
var timedOutEventIds = new List<int>();
int onNextCalledTimes = 0;
int threadPoolQueuedTaskCount = 0;
for (var i = 0; i < concurrentCount; i++)
{
var safe = i;
Scheduler.Default.Schedule(() =>
{
IDisposable dsp = null;
dsp = UnsolicitedEventCenter.Publisher
.Timeout(TimeSpan.FromMilliseconds(8000))
.Where(incomingValue => incomingValue == safe)
.ObserveOn(Scheduler.Default)
//.Take(1)
.Subscribe((incomingEvent) =>
{
Interlocked.Increment(ref onNextCalledTimes);
dsp.Dispose();
}
, ex =>
{
Interlocked.Increment(ref timeoutExceptionOccurredTimes);
lock (timedOutEventIds)
{
// mark this id has been timed out, only for unit testing result check.
timedOutEventIds.Add(safe);
}
dsp.Dispose();
});
Interlocked.Increment(ref threadPoolQueuedTaskCount);
});
}
Console.WriteLine("Starting fire event: " + DateTime.Now.ToString("HH:mm:ss.ffff"));
int threadPoolQueuedTaskCount1 = 0;
// simulate a concurrent event fire
for (int i = 0; i < fireFakeEventCount; i++)
{
var safe = i;
Scheduler.Default.Schedule(() =>
{
EventFireCenter.TestFireDummyEventWithId(safe);
Interlocked.Increment(ref threadPoolQueuedTaskCount1);
});
}
// make sure all proceeding task has been done in threadPool.
while (threadPoolQueuedTaskCount < concurrentCount)
{
Thread.Sleep(1000);
}
// make sure all proceeding task has been done in threadPool.
while (threadPoolQueuedTaskCount1 < fireFakeEventCount)
{
Thread.Sleep(100);
}
Console.WriteLine("Finished fire event: " + DateTime.Now.ToString("HH:mm:ss.ffff"));
// sleep a time which >3000ms.
Thread.Sleep(8000);
Console.WriteLine("timeoutExceptionOccurredTimes: " + timeoutExceptionOccurredTimes);
Console.WriteLine("onNextCalledTimes: " + onNextCalledTimes);
if ((concurrentCount - fireFakeEventCount) != timeoutExceptionOccurredTimes)
{
try
{
Console.WriteLine("Non timeout fired for these ids: " +
Enumerable.Range(0, concurrentCount)
.Except(timedOutEventIds).Except(Enumerable.Range(0, fireFakeEventCount)).Select(i => i.ToString())
.Aggregate((acc, n) => acc + "," + n));
}
catch (Exception ex) { Console.WriteLine("faild to output timedout ids..."); }
break;
}
if (fireFakeEventCount != onNextCalledTimes)
{
Console.WriteLine("onNextOccurredTimes assert failed");
break;
}
if ((concurrentCount - fireFakeEventCount) != timeoutExceptionOccurredTimes)
{
Console.WriteLine("timeoutExceptionOccurredTimes assert failed");
break;
}
}
Console.WriteLine("");
Console.WriteLine("");
Console.WriteLine("DONE!");
Console.ReadLine();
}
}
public class EventFireCenter
{
public static event EventHandler<GTCommandTerminalEventArg> OnTerminalEventArrived;
private static object syncObject = new object();
public static void TestFireDummyEventWithId(int id)
{
lock (syncObject)
{
var safe = OnTerminalEventArrived;
if (safe != null)
{
safe(null, new GTCommandTerminalEventArg(id));
}
}
}
}
public class UnsolicitedEventCenter
{
private readonly static IObservable<int> publisher;
static UnsolicitedEventCenter()
{
publisher = Observable.FromEventPattern<GTCommandTerminalEventArg>(typeof(EventFireCenter), "OnTerminalEventArrived")
.Select(s => s.EventArgs.Id);
}
private UnsolicitedEventCenter() { }
/// <summary>
/// Gets the Publisher property to start observe an observable sequence.
/// </summary>
public static IObservable<int> Publisher { get { return publisher; } }
}
public class GTCommandTerminalEventArg : System.EventArgs
{
public GTCommandTerminalEventArg(int id)
{
this.Id = id;
}
public int Id { get; private set; }
}
}
很可能Timeout
没有触发,因为它在Where
过滤器之前。这意味着所有事件都通过并重置计时器,然后大多数事件被Where
子句过滤。对于订阅的观察者来说,它似乎永远不会得到结果,也永远不会触发超时。将Timeout
移动到Where
之后,您现在应该有一个系统,如果单个观察者没有及时获得预期的事件,则会超时。