我有两个需要按顺序处理的IDisposables
。排序很重要,因为第一个IDisposable
杀死了一个依赖于将被第二个IDisposable
杀死的服务的Rx订阅。这是在Windows窗体应用程序中,IObservable
的订阅需要发生在不同的线程上,但观察和处理需要发生在UI线程上。(实际上,只要顺序得到保证,我并不关心处理是否发生在UI线程上。)因此,在代码中,我大致有以下内容(一次简化):
SomeService = new DisposableService();
Subscription = Foo(someService).SubscribeOn(NewThreadScheduler.Default).ObserveOn(theForm).Subscribe(...)
对于许多UI事件,我需要按顺序处理这两个事件(Subscription,然后是SomeService)。为此,我尝试使用Rx的CompositeDisposable
和ContextDisposable
在同一线程上提供串行处理:
_Disposable = new CompositeDisposable(new[] {
new ContextDisposable(WindowsFormsSynchronizationContext.Current, Subscription),
new ContextDisposable(WindowsFormsSynchronizationContext.Current, SomeService)});
然而,上面的不起作用。根据我的日志记录,_Disposable
和SomeService
的ContextDisposable
在同一个线程上被调用,但ContextDisposable
仍然在与被处置的服务并发的不同线程上发生(从而导致竞争条件和npe)。
我只编程c#几个星期,所以我确定问题是我误解了上下文和调度程序的工作方式。解决这个问题的正确方法是什么?
除非我误解了什么,否则您可以控制哪个线程处理什么。谁订阅哪个线程并不重要。看看这个例子
internal class Program
{
private static void Main(string[] args)
{
ReactiveTest rx1 = null;
ReactiveTest rx2 = null;
var thread1 = new Thread(() => rx1 = new ReactiveTest());
var thread2 = new Thread(() => rx2 = new ReactiveTest());
thread1.Start();
thread2.Start();
Thread.Sleep(TimeSpan.FromSeconds(1));
thread1.Join();
thread2.Join();
rx1.Dispose();
rx2.Dispose();
}
}
public class ReactiveTest : IDisposable
{
private IDisposable _timerObservable;
private object _lock = new object();
public ReactiveTest()
{
_timerObservable = Observable.Interval(TimeSpan.FromMilliseconds(250)).Subscribe(i =>
Console.WriteLine("[{0}] - {1}", Thread.CurrentThread.ManagedThreadId, i));
}
public void Dispose()
{
lock (_lock)
{
_timerObservable.Dispose();
Console.WriteLine("[{0}] - DISPOSING", Thread.CurrentThread.ManagedThreadId);
}
}
}
输出
[14] - 0
[7] - 0
[15] - 1
[7] - 1
[14] - 2
[15] - 2
[10] - DISPOSING
[10] - DISPOSING
你可以看到我们在两个独立的线程上订阅,然后在第三个线程上释放。我只是锁定了dispose,以防在订阅中需要发生线程安全的事情。
SubscribeOn调度对Subscribe
和Dispose
的调用。因此,在Subscription变量上调用Dispose
,无论当前是否在UI线程上执行,都会导致订阅被NewThreadScheduler.Default
调度处置。
使用SubscribeOn
几乎从来不是个好主意;然而,在您的案例中,您声称它解决了50%的问题——这比我见过的大多数使用都多50%——所以我必须质疑您是否真的需要在后台线程上执行订阅。如果方法所做的只是开始一些异步工作,如发送网络请求或读取文件,那么创建一个全新的线程然后调用它的方法,与直接调用UI线程上的方法相比,代价是非常昂贵的。如果计算要发送的网络消息被证明过于耗时,那么使用SubscribeOn
可能是正确的;虽然,当然,只有当您也希望安排处置时才会这样做。
如果对可观察对象的订阅必须在后台线程中执行,而dispose必须保持自由线程状态,则考虑使用以下操作符(未经测试)。
public static class ObservableExtensions
{
public static IObservable<TSource> SubscribeOn<TSource>(
this IObservable<TSource> source,
bool doNotScheduleDisposal,
IScheduler scheduler)
{
if (!doNotScheduleDisposal)
{
return source.SubscribeOn(scheduler);
}
return Observable.Create<TSource>(observer =>
{
// Implementation is based on that of the native SubscribeOn operator in Rx
var s = new SingleAssignmentDisposable();
var d = new SerialDisposable();
d.Disposable = s;
s.Disposable = scheduler.Schedule(() =>
{
d.Disposable = source.SubscribeSafe(observer);
});
return d;
});
}
}