我经常遇到需要某种阀门结构来控制反应管道流量的情况。通常,在基于网络的应用程序中,我需要根据连接状态打开/关闭请求流。
该阀门主体应支持打开/关闭流,并按FIFO顺序输出。当阀门关闭时,应缓冲输入值。
ConcurrentQueue
或BlockingCollection
通常用于此类场景,但这会立即将线程引入画面中。我一直在寻找一个完全被动的解决方案。
这里有一个主要基于Buffer()
和BehaviorSubject
的实现。行为主体跟踪阀门的打开/关闭状态。打开阀门开始缓冲窗口,关闭阀门关闭这些窗口。缓冲操作器的输出被"重新注入"到输入上(这样即使是观察者自己也可以关闭阀门):
/// <summary>
/// Subject offering Open() and Close() methods, with built-in buffering.
/// Note that closing the valve in the observer is supported.
/// </summary>
/// <remarks>As is the case with other Rx subjects, this class is not thread-safe, in that
/// order of elements in the output is indeterministic in the case of concurrent operation
/// of Open()/Close()/OnNext()/OnError(). To guarantee strict order of delivery even in the
/// case of concurrent access, <see cref="ValveSubjectExtensions.Synchronize{T}(NEXThink.Finder.Utils.Rx.IValveSubject{T})"/> can be used.</remarks>
/// <typeparam name="T">Elements type</typeparam>
public class ValveSubject<T> : IValveSubject<T>
{
private enum Valve
{
Open,
Closed
}
private readonly Subject<T> input = new Subject<T>();
private readonly BehaviorSubject<Valve> valveSubject = new BehaviorSubject<Valve>(Valve.Open);
private readonly Subject<T> output = new Subject<T>();
public ValveSubject()
{
var valveOperations = valveSubject.DistinctUntilChanged();
input.Buffer(
bufferOpenings: valveOperations.Where(v => v == Valve.Closed),
bufferClosingSelector: _ => valveOperations.Where(v => v == Valve.Open))
.SelectMany(t => t).Subscribe(input);
input.Where(t => valveSubject.Value == Valve.Open).Subscribe(output);
}
public bool IsOpen
{
get { return valveSubject.Value == Valve.Open; }
}
public bool IsClosed
{
get { return valveSubject.Value == Valve.Closed; }
}
public void OnNext(T value)
{
input.OnNext(value);
}
public void OnError(Exception error)
{
input.OnError(error);
}
public void OnCompleted()
{
output.OnCompleted();
input.OnCompleted();
valveSubject.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return output.Subscribe(observer);
}
public void Open()
{
valveSubject.OnNext(Valve.Open);
}
public void Close()
{
valveSubject.OnNext(Valve.Closed);
}
}
public interface IValveSubject<T>:ISubject<T>
{
void Open();
void Close();
}
冲洗阀门的另一种方法有时也很有用,例如在不再相关时消除剩余请求。以下是一个基于先例的实现,适配器样式:
/// <summary>
/// Subject with same semantics as <see cref="ValveSubject{T}"/>, but adding flushing out capability
/// which allows clearing the valve of any remaining elements before closing.
/// </summary>
/// <typeparam name="T">Elements type</typeparam>
public class FlushableValveSubject<T> : IFlushableValveSubject<T>
{
private readonly BehaviorSubject<ValveSubject<T>> valvesSubject = new BehaviorSubject<ValveSubject<T>>(new ValveSubject<T>());
private ValveSubject<T> CurrentValve
{
get { return valvesSubject.Value; }
}
public bool IsOpen
{
get { return CurrentValve.IsOpen; }
}
public bool IsClosed
{
get { return CurrentValve.IsClosed; }
}
public void OnNext(T value)
{
CurrentValve.OnNext(value);
}
public void OnError(Exception error)
{
CurrentValve.OnError(error);
}
public void OnCompleted()
{
CurrentValve.OnCompleted();
valvesSubject.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return valvesSubject.Switch().Subscribe(observer);
}
public void Open()
{
CurrentValve.Open();
}
public void Close()
{
CurrentValve.Close();
}
/// <summary>
/// Discards remaining elements in the valve and reset the valve into a closed state
/// </summary>
/// <returns>Replayable observable with any remaining elements</returns>
public IObservable<T> FlushAndClose()
{
var previousValve = CurrentValve;
valvesSubject.OnNext(CreateClosedValve());
var remainingElements = new ReplaySubject<T>();
previousValve.Subscribe(remainingElements);
previousValve.Open();
return remainingElements;
}
private static ValveSubject<T> CreateClosedValve()
{
var valve = new ValveSubject<T>();
valve.Close();
return valve;
}
}
public interface IFlushableValveSubject<T> : IValveSubject<T>
{
IObservable<T> FlushAndClose();
}
正如评论中所提到的,这些主题不是"线程安全的",因为在并发操作的情况下,交付顺序不再得到保证。以与标准Rx Subject
、Subject.Synchronize()
类似的方式(https://msdn.microsoft.com/en-us/library/hh211643%28v=vs.103%29.aspx)我们可以介绍一些在阀门周围提供锁定的扩展:
public static class ValveSubjectExtensions
{
public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve)
{
return Synchronize(valve, new object());
}
public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve, object gate)
{
return new SynchronizedValveAdapter<T>(valve, gate);
}
public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve)
{
return Synchronize(valve, new object());
}
public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve, object gate)
{
return new SynchronizedFlushableValveAdapter<T>(valve, gate);
}
}
internal class SynchronizedValveAdapter<T> : IValveSubject<T>
{
private readonly object gate;
private readonly IValveSubject<T> valve;
public SynchronizedValveAdapter(IValveSubject<T> valve, object gate)
{
this.valve = valve;
this.gate = gate;
}
public void OnNext(T value)
{
lock (gate)
{
valve.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (gate)
{
valve.OnError(error);
}
}
public void OnCompleted()
{
lock (gate)
{
valve.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
return valve.Subscribe(observer);
}
public void Open()
{
lock (gate)
{
valve.Open();
}
}
public void Close()
{
lock (gate)
{
valve.Close();
}
}
}
internal class SynchronizedFlushableValveAdapter<T> : SynchronizedValveAdapter<T>, IFlushableValveSubject<T>
{
private readonly object gate;
private readonly IFlushableValveSubject<T> valve;
public SynchronizedFlushableValveAdapter(IFlushableValveSubject<T> valve, object gate)
: base(valve, gate)
{
this.valve = valve;
this.gate = gate;
}
public IObservable<T> FlushAndClose()
{
lock (gate)
{
return valve.FlushAndClose();
}
}
}
以下是我使用延迟运算符的实现:
source.delay(new Func1<Integer, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Integer integer) {
return valve.filter(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean aBoolean) {
return aBoolean;
}
});
}
})
.toBlocking()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("out: " + integer);
}
});
这个想法是将所有源排放推迟到"阀门打开"。如果阀门已经打开,则不会延迟项目的排放。
Rx阀门要点