使用Rx,我希望在以下代码中使用暂停和恢复功能:
如何实现Pause()和Resume() ?
static IDisposable _subscription;
static void Main(string[] args)
{
Subscribe();
Thread.Sleep(500);
// Second value should not be shown after two seconds:
Pause();
Thread.Sleep(5000);
// Continue and show second value and beyond now:
Resume();
}
static void Subscribe()
{
var list = new List<int> { 1, 2, 3, 4, 5 };
var obs = list.ToObservable();
_subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Thread.Sleep(2000);
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
}
static void Pause()
{
// Pseudocode:
//_subscription.Pause();
}
static void Resume()
{
// Pseudocode:
//_subscription.Resume();
}
Rx解决方案吗?
我相信我可以使它工作与某种布尔域门与线程锁定相结合(
Monitor.Wait
和Monitor.Pulse
)但是是否存在Rx操作符或其他一些反应性简写来实现相同的目标?
这里有一个相当简单的Rx方法来做您想做的事情。我已经创建了一个名为Pausable
的扩展方法,它接受一个源可观察对象和一个布尔值的第二个可观察对象,暂停或恢复可观察对象。
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
可以这样使用:
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
对你来说,用Pause
&Resume
方法。
这是IConnectableObservable的一个应用程序,我为更新的api(原来在这里)稍微纠正了一下:
public static class ObservableHelper {
public static IConnectableObservable<TSource> WhileResumable<TSource>(Func<bool> condition, IObservable<TSource> source) {
var buffer = new Queue<TSource>();
var subscriptionsCount = 0;
var isRunning = System.Reactive.Disposables.Disposable.Create(() => {
lock (buffer)
{
subscriptionsCount--;
}
});
var raw = Observable.Create<TSource>(subscriber => {
lock (buffer)
{
subscriptionsCount++;
if (subscriptionsCount == 1)
{
while (buffer.Count > 0) {
subscriber.OnNext(buffer.Dequeue());
}
Observable.While(() => subscriptionsCount > 0 && condition(), source)
.Subscribe(
v => { if (subscriptionsCount == 0) buffer.Enqueue(v); else subscriber.OnNext(v); },
e => subscriber.OnError(e),
() => { if (subscriptionsCount > 0) subscriber.OnCompleted(); }
);
}
}
return isRunning;
});
return raw.Publish();
}
}
以下是我的答案。我相信在pause resume周围可能存在竞争条件,但是可以通过将所有活动序列化到调度程序来减轻这种情况。(序列化优于同步)。
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Microsoft.Reactive.Testing;
using NUnit.Framework;
namespace StackOverflow.Tests.Q7620182_PauseResume
{
[TestFixture]
public class PauseAndResumeTests
{
[Test]
public void Should_pause_and_resume()
{
//Arrange
var scheduler = new TestScheduler();
var isRunningTrigger = new BehaviorSubject<bool>(true);
Action pause = () => isRunningTrigger.OnNext(false);
Action resume = () => isRunningTrigger.OnNext(true);
var source = scheduler.CreateHotObservable(
ReactiveTest.OnNext(0.1.Seconds(), 1),
ReactiveTest.OnNext(2.0.Seconds(), 2),
ReactiveTest.OnNext(4.0.Seconds(), 3),
ReactiveTest.OnNext(6.0.Seconds(), 4),
ReactiveTest.OnNext(8.0.Seconds(), 5));
scheduler.Schedule(TimeSpan.FromSeconds(0.5), () => { pause(); });
scheduler.Schedule(TimeSpan.FromSeconds(5.0), () => { resume(); });
//Act
var sut = Observable.Create<IObservable<int>>(o =>
{
var current = source.Replay();
var connection = new SerialDisposable();
connection.Disposable = current.Connect();
return isRunningTrigger
.DistinctUntilChanged()
.Select(isRunning =>
{
if (isRunning)
{
//Return the current replayed values.
return current;
}
else
{
//Disconnect and replace current.
current = source.Replay();
connection.Disposable = current.Connect();
//yield silence until the next time we resume.
return Observable.Never<int>();
}
})
.Subscribe(o);
}).Switch();
var observer = scheduler.CreateObserver<int>();
using (sut.Subscribe(observer))
{
scheduler.Start();
}
//Assert
var expected = new[]
{
ReactiveTest.OnNext(0.1.Seconds(), 1),
ReactiveTest.OnNext(5.0.Seconds(), 2),
ReactiveTest.OnNext(5.0.Seconds(), 3),
ReactiveTest.OnNext(6.0.Seconds(), 4),
ReactiveTest.OnNext(8.0.Seconds(), 5)
};
CollectionAssert.AreEqual(expected, observer.Messages);
}
}
}
它只是工作:
class SimpleWaitPulse
{
static readonly object _locker = new object();
static bool _go;
static void Main()
{ // The new thread will block
new Thread (Work).Start(); // because _go==false.
Console.ReadLine(); // Wait for user to hit Enter
lock (_locker) // Let's now wake up the thread by
{ // setting _go=true and pulsing.
_go = true;
Monitor.Pulse (_locker);
}
}
static void Work()
{
lock (_locker)
while (!_go)
Monitor.Wait (_locker); // Lock is released while we’re waiting
Console.WriteLine ("Woken!!!");
}
}
请参见如何使用等待和脉冲获取更多详细信息