我正在使用rx来观察一个事件。只有一个订阅者的操作很长。
当上一个操作未完成时,该事件可能会再次触发。如何取消此活动?
以下是我正在搜索的内容:类似于SubscribeWithDebounceAsync方法:
var observable = Observable.FromEventPattern<T>(obj, "OnSomeEvent");
observable.SubscribeWithDebounceAsync( ep => .... );
您的问题听起来像是希望在事件发生时执行此操作,而您当前没有执行该操作。您的评论表明您只想执行一次操作。
后者更简单:
var observable = Observable.FromEventPattern<T>(...);
var disp = observable.Take(1).Subscribe(ep => ...);
对于前者,我假设您打算在另一个线程上执行这个长时间运行的操作。如果你不这样做,你会挂断Observable正在引发消息的线程。SerialDisposable
是你在这里的朋友:
<Extension()>
Public Function IgnoreWhileExecuting(Of T)(source As IObservable(Of T),
onNext As Func(Of T, Task),
onError As Action(Of Exception),
onCompleted As Action
) As IDisposable
'argument validation/error handling skipped for sample
Dim serial As New SerialDisposable
Dim syncNext As Action(Of T) = Nothing
'this function will be called for the initial and subsequent subscriptions
Dim subscribe As Func(Of IDisposable) =
Function()
Return source.Subscribe(syncNext,
onError,
onCompleted)
End Function
'this function will be "suspend" the subscription while executing
'the long-running operation, but it must return immediately to
syncNext = Sub(v)
'stop the current subscription to the source
serial.Disposable.Dispose()
'perform the long running operation and follow
'with resubscription. This will resubscribe regardless
'of if the task completes successfully or not
onNext(v).ContinueWith(Sub() serial.Disposable = subscribe())
End Sub
serial.Disposable = subscribe()
Return serial
End Function
由于这将订阅和取消订阅,它通常假设您有一个热门的可观察对象,就像您在问题中使用的事件可观察对象一样。你可以在控制台应用程序中进行测试。
Sub Main()
Dim left = Observable.Interval(TimeSpan.FromMilliseconds(500))
Dim leftHot = left.Do(Sub(v) WriteTimestamped("Tick {0}", v)).Publish()
Dim f As New TaskFactory
Dim disp = leftHot _
.IgnoreWhileExecuting(Function(v)
Return f.StartNew(Sub(tparam)
WriteTimestamped("Before sleep {0}", tparam)
Thread.Sleep(2000)
WriteTimestamped("After sleep {0}", tparam)
End Sub,
v)
End Function,
Sub(ex) Console.WriteLine("Error in ignore: " & ex.ToString()),
Sub() Console.WriteLine("Completed from ignore"))
Dim con = leftHot.Connect()
Console.ReadKey()
disp.Dispose()
Console.ReadKey()
con.Dispose()
Console.ReadKey()
End Sub
Private Sub WriteTimestamped(ByVal format As String, ByVal arg As Object)
Console.WriteLine(Date.Now.ToString("HH:mm:ss.f") & " " & String.Format(format, arg))
End Sub
要查看热和冷之间的区别,请删除Publish
调用和相应的Connect
以及连接处理,然后再次运行示例。
这里有一个不同的实现:
public static class Extensions
{
public static IDisposable SubscribeWithDebounceAsync<T>(
this IObservable<T> source,
Action<T> longRunningTask)
{
var finished = new Subject<Unit>();
var debounced = source
.SkipUntil(finished)
.Take(1)
.Repeat()
.Subscribe(
t => Observable.Start(() =>
{
longRunningTask(t);
finished.OnNext(Unit.Default);
}));
finished.OnNext(Unit.Default);
return debounced;
}
}
最后一个完成了。OnNext是用来启动进程的,否则它将永远停在SkippUntil(完成)上。
编辑:使代码更加简洁。