使用反应式扩展,我想忽略来自我的事件流的消息,这些消息在我的Subscribe
方法运行时发生。 也就是说,有时我处理消息所需的时间比消息之间的时间要长,因此我想丢弃我没有时间处理的消息。
但是,当我的Subscribe
方法完成时,如果有任何消息通过,我想处理最后一个消息。所以我总是处理最新的消息。
所以,如果我有一些代码可以:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
如果我们假设"100"需要很长时间来处理。然后,我希望在"2"完成时处理"100"。应忽略"1",因为在"100"仍在处理时,它已被"2"取代。
下面是我希望使用后台任务的结果示例,Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
但是,Latest() 是一个阻塞调用,我不希望有一个线程坐在那里等待这样的下一个值(消息之间有时会有很长的间隔)。
我还可以使用 TPL 数据流中的BroadcastBlock
获得我想要的结果,如下所示:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
但这感觉应该可以直接在 Rx 中实现。最好的方法是什么?
这是一个类似于 Dave 的方法,但改用Sample
(这比缓冲区更合适)。我包含一个与我添加到 Dave 答案中的扩展方法类似的扩展方法。
扩展名:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var sampler = new Subject<Unit>();
var sub = source.
Sample(sampler).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
// start sampling when we have a first value
source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));
return sub;
}
请注意,它更简单,并且没有触发的"空"缓冲区。发送到操作的第一个元素实际上来自流本身。
用法很简单:
messages.SubscribeWithoutOverlap(n =>
{
Console.WriteLine("start: " + n);
Thread.Sleep(500);
Console.WriteLine("end: " + n);
});
messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing
结果:
source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
感谢 Lee Campbell(Intro To Rx 成名),我现在有一个使用此扩展方法的工作解决方案:
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
Notification<T> outsideNotification = null;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool alreadyActive;
lock (gate)
{
alreadyActive = active;
active = true;
outsideNotification = thisNotification;
}
if (!alreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification = null;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
这是使用"just"Rx的尝试。计时器和订阅者通过观察线程池保持独立,我使用主题来提供完成任务的反馈。
我不认为这是一个简单的解决方案,但我希望它能给你一些改进的想法。
messages.
Buffer(() => feedback).
Select(l => l.LastOrDefault()).
ObserveOn(Scheduler.ThreadPool).
Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
feedback.OnNext(Unit.Default);
});
feedback.OnNext(Unit.Default);
有一个小问题 - 缓冲区在空时首先关闭,因此它会生成默认值。您可以通过在第一条消息之后进行反馈来解决它。
这里是作为扩展函数:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var feedback = new Subject<Unit>();
var sub = source.
Buffer(() => feedback).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l.LastOrDefault());
feedback.OnNext(Unit.Default);
});
feedback.OnNext(Unit.Default);
return sub;
}
和用法:
messages.SubscribeWithoutOverlap(n =>
{
Thread.Sleep(1000);
Console.WriteLine(n);
});
使用 Observable.Switch 的示例。它还处理当您完成任务但什么都没有时的情况在队列中。
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive
{
public static class RXX
{
public static IDisposable SubscribeWithoutOverlap<T>
( this IObservable<T> source
, Action<T> action
, IScheduler scheduler = null)
{
var sampler = new Subject<Unit>();
scheduler = scheduler ?? Scheduler.Default;
var p = source.Publish();
var connection = p.Connect();
var subscription = sampler.Select(x=>p.Take(1))
.Switch()
.ObserveOn(scheduler)
.Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
sampler.OnNext(Unit.Default);
return new CompositeDisposable(connection, subscription);
}
}
}
我写了一篇关于这个问题的博客文章,其中包含一个使用 CAS 而不是锁并避免递归的解决方案。代码如下,但您可以在此处找到完整的解释:http://www.zerobugbuild.com/?p=192
public static IObservable<TSource> ObserveLatestOn<TSource>(
this IObservable<TSource> source,
IScheduler scheduler)
{
return Observable.Create<TSource>(observer =>
{
Notification<TSource> pendingNotification = null;
var cancelable = new MultipleAssignmentDisposable();
var sourceSubscription = source.Materialize()
.Subscribe(notification =>
{
var previousNotification = Interlocked.Exchange(
ref pendingNotification, notification);
if (previousNotification != null) return;
cancelable.Disposable = scheduler.Schedule(() =>
{
var notificationToSend = Interlocked.Exchange(
ref pendingNotification, null);
notificationToSend.Accept(observer);
});
});
return new CompositeDisposable(sourceSubscription, cancelable);
});
}
这是一个基于 Task
的实现,具有取消语义,不使用主题。如果需要,调用 dispose 允许订阅的操作取消处理。
public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
{
var cancellation = new CancellationDisposable();
var token = cancellation.Token;
Task task = null;
return new CompositeDisposable(
cancellation,
observable.Subscribe(value =>
{
if (task == null || task.IsCompleted)
task = Task.Factory.StartNew(() => action(value, token), token);
})
);
}
这是一个简单的测试:
Observable.Interval(TimeSpan.FromMilliseconds(150))
.SampleSubscribe((v, ct) =>
{
//cbeck for cancellation, do work
for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
Thread.Sleep(100);
Console.WriteLine(v);
});
输出:
0
7
14
21
28
35
使用 Rx 2.0 RC,您可以使用 Chunkify
获取 IE无数列表,每个列表都包含自上次 MoveNext 以来观察到的内容。
然后,您可以使用ToObservable
将其转换回 IObservable,并且只注意每个非空列表中的最后一个条目。
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
messages.Chunkify()
.ToObservable(Scheduler.TaskPool)
.Where(list => list.Any())
.Select(list => list.Last())
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
刚刚完成(并且已经完全修改了)我自己的问题解决方案,我计划在生产中使用。
除非调度程序使用当前线程,否则从源调用OnNext
、OnCompleted
、OnError
应立即返回;如果观察者忙于处理以前的通知,它们将进入具有可指定最大大小的队列,每当处理了以前的通知时,他们都会收到通知。如果队列已满,则丢弃最近的项目。因此,最大队列大小为 0 将忽略观察者繁忙时传入的所有项目;大小为 1 将始终允许观察最新项目;最大int.MaxValue
的尺寸使消费者忙碌,直到赶上生产者。
如果调度程序支持长时间运行(即给你一个自己的线程),我调度一个循环来通知观察者;否则我使用递归调度。
这是代码。任何意见不胜感激。
partial class MoreObservables
{
/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
if (scheduler == null) scheduler = Scheduler.Default;
return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
}
private static class LatestImpl<TSource>
{
public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
var longrunningScheduler = scheduler.AsLongRunning();
if (longrunningScheduler != null)
return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);
return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
}
#region Subscriptions
/// <summary>
/// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
/// </summary>
private sealed class LoopSubscription : IDisposable
{
private enum State
{
Idle, // nothing to notify
Head, // next notification is in _head
Queue, // next notifications are in _queue, followed by _completion
Disposed, // disposed
}
private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
private readonly IObserver<TSource> _observer;
private State _state;
private TSource _head; // item in front of the queue
private IQueue _queue; // queued items
private Notification<TSource> _completion; // completion notification
public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
{
_observer = observer;
_queue = Queue.Create(maxQueueSize);
scheduler.ScheduleLongRunning(_ => Loop());
_subscription.Disposable = source.Subscribe(
OnNext,
error => OnCompletion(Notification.CreateOnError<TSource>(error)),
() => OnCompletion(Notification.CreateOnCompleted<TSource>()));
}
private void OnNext(TSource value)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_head = value;
_state = State.Head;
Monitor.Pulse(_subscription);
break;
case State.Head:
case State.Queue:
if (_completion != null) return;
try { _queue.Enqueue(value); }
catch (Exception error) // probably OutOfMemoryException
{
_completion = Notification.CreateOnError<TSource>(error);
_subscription.Dispose();
}
break;
}
}
}
private void OnCompletion(Notification<TSource> completion)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_completion = completion;
_state = State.Queue;
Monitor.Pulse(_subscription);
_subscription.Dispose();
break;
case State.Head:
case State.Queue:
if (_completion != null) return;
_completion = completion;
_subscription.Dispose();
break;
}
}
}
public void Dispose()
{
lock (_subscription)
{
if (_state == State.Disposed) return;
_head = default(TSource);
_queue = null;
_completion = null;
_state = State.Disposed;
Monitor.Pulse(_subscription);
_subscription.Dispose();
}
}
private void Loop()
{
try
{
while (true) // overall loop for all notifications
{
// next notification to emit
Notification<TSource> completion;
TSource next; // iff completion == null
lock (_subscription)
{
while (true)
{
while (_state == State.Idle)
Monitor.Wait(_subscription);
if (_state == State.Head)
{
completion = null;
next = _head;
_head = default(TSource);
_state = State.Queue;
break;
}
if (_state == State.Queue)
{
if (!_queue.IsEmpty)
{
completion = null;
next = _queue.Dequeue(); // assumption: this never throws
break;
}
if (_completion != null)
{
completion = _completion;
next = default(TSource);
break;
}
_state = State.Idle;
continue;
}
Debug.Assert(_state == State.Disposed);
return;
}
}
if (completion != null)
{
completion.Accept(_observer);
return;
}
_observer.OnNext(next);
}
}
finally { Dispose(); }
}
}
/// <summary>
/// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
/// </summary>
private sealed class RecursiveSubscription : IDisposable
{
private enum State
{
Idle, // nothing to notify
Scheduled, // emitter scheduled or executing
Disposed, // disposed
}
private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
private readonly IScheduler _scheduler;
private readonly IObserver<TSource> _observer;
private State _state;
private IQueue _queue; // queued items
private Notification<TSource> _completion; // completion notification
public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
{
_scheduler = scheduler;
_observer = observer;
_queue = Queue.Create(maxQueueSize);
_subscription.Disposable = source.Subscribe(
OnNext,
error => OnCompletion(Notification.CreateOnError<TSource>(error)),
() => OnCompletion(Notification.CreateOnCompleted<TSource>()));
}
private void OnNext(TSource value)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_emitter.Disposable = _scheduler.Schedule(value, EmitNext);
_state = State.Scheduled;
break;
case State.Scheduled:
if (_completion != null) return;
try { _queue.Enqueue(value); }
catch (Exception error) // probably OutOfMemoryException
{
_completion = Notification.CreateOnError<TSource>(error);
_subscription.Dispose();
}
break;
}
}
}
private void OnCompletion(Notification<TSource> completion)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_completion = completion;
_emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
_state = State.Scheduled;
_subscription.Dispose();
break;
case State.Scheduled:
if (_completion != null) return;
_completion = completion;
_subscription.Dispose();
break;
}
}
}
public void Dispose()
{
lock (_subscription)
{
if (_state == State.Disposed) return;
_emitter.Dispose();
_queue = null;
_completion = null;
_state = State.Disposed;
_subscription.Dispose();
}
}
private void EmitNext(TSource value, Action<TSource> self)
{
try { _observer.OnNext(value); }
catch { Dispose(); return; }
lock (_subscription)
{
if (_state == State.Disposed) return;
Debug.Assert(_state == State.Scheduled);
if (!_queue.IsEmpty)
self(_queue.Dequeue());
else if (_completion != null)
_emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
else
_state = State.Idle;
}
}
private void EmitCompletion(Notification<TSource> completion)
{
try { completion.Accept(_observer); }
finally { Dispose(); }
}
}
#endregion
#region IQueue
/// <summary>
/// FIFO queue that discards least recent items if size limit is reached.
/// </summary>
private interface IQueue
{
bool IsEmpty { get; }
void Enqueue(TSource item);
TSource Dequeue();
}
/// <summary>
/// <see cref="IQueue"/> implementations.
/// </summary>
private static class Queue
{
public static IQueue Create(int maxSize)
{
switch (maxSize)
{
case 0: return Zero.Instance;
case 1: return new One();
default: return new Many(maxSize);
}
}
private sealed class Zero : IQueue
{
// ReSharper disable once StaticMemberInGenericType
public static Zero Instance { get; } = new Zero();
private Zero() { }
public bool IsEmpty => true;
public void Enqueue(TSource item) { }
public TSource Dequeue() { throw new InvalidOperationException(); }
}
private sealed class One : IQueue
{
private TSource _item;
public bool IsEmpty { get; private set; } = true;
public void Enqueue(TSource item)
{
_item = item;
IsEmpty = false;
}
public TSource Dequeue()
{
if (IsEmpty) throw new InvalidOperationException();
var item = _item;
_item = default(TSource);
IsEmpty = true;
return item;
}
}
private sealed class Many : IQueue
{
private readonly int _maxSize, _initialSize;
private int _deq, _enq; // indices of deque and enqueu positions
private TSource[] _buffer;
public Many(int maxSize)
{
if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));
_maxSize = maxSize;
if (maxSize == int.MaxValue)
_initialSize = 4;
else
{
// choose an initial size that won't get us too close to maxSize when doubling
_initialSize = maxSize;
while (_initialSize >= 7)
_initialSize = (_initialSize + 1) / 2;
}
}
public bool IsEmpty { get; private set; } = true;
public void Enqueue(TSource item)
{
if (IsEmpty)
{
if (_buffer == null) _buffer = new TSource[_initialSize];
_buffer[0] = item;
_deq = 0;
_enq = 1;
IsEmpty = false;
return;
}
if (_deq == _enq) // full
{
if (_buffer.Length == _maxSize) // overwrite least recent
{
_buffer[_enq] = item;
if (++_enq == _buffer.Length) _enq = 0;
_deq = _enq;
return;
}
// increse buffer size
var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
var newBuffer = new TSource[newSize];
var count = _buffer.Length - _deq;
Array.Copy(_buffer, _deq, newBuffer, 0, count);
Array.Copy(_buffer, 0, newBuffer, count, _deq);
_deq = 0;
_enq = _buffer.Length;
_buffer = newBuffer;
}
_buffer[_enq] = item;
if (++_enq == _buffer.Length) _enq = 0;
}
public TSource Dequeue()
{
if (IsEmpty) throw new InvalidOperationException();
var result = ReadAndClear(ref _buffer[_deq]);
if (++_deq == _buffer.Length) _deq = 0;
if (_deq == _enq)
{
IsEmpty = true;
if (_buffer.Length > _initialSize) _buffer = null;
}
return result;
}
private static TSource ReadAndClear(ref TSource item)
{
var result = item;
item = default(TSource);
return result;
}
}
}
#endregion
}
}
另一种解决方案。
这并不漂亮,因为它混合了Task
和Observable
,因此无法使用ReactiveTest
进行真正测试(尽管说实话,我不确定如何实现"慢"订阅ReactiveTest
者)。
public static IObservable<T> ShedLoad<T>(this IObservable<T> source)
{
return Observable.Create<T>(observer =>
{
Task task = Task.FromResult(0);
return source.Subscribe(t =>
{
if(task.IsCompleted)
task = Task.Run(() => observer.OnNext(t));
else
Debug.WriteLine("Skip, task not finished");
}, observer.OnError, observer.OnCompleted);
});
}
我猜那里可能存在竞争条件,但在我看来,如果我们正处于因为发展太快而放弃东西的阶段,我不介意放弃太多或太少。哦,每个OnNext
(可能)在不同的线程上调用(我想我可以在Create
的背面放一个Synchronize
)。
承认我无法让 Materialize 扩展正常工作(我将其连接到FromEventPattern(MouseMove)
然后订阅了一个故意变慢的订阅,奇怪的是它会让突发事件通过,而不是一次一个)