目前我有一个任务由TPL调度器处理:
var result = await task;
是否可以通过将其放入ObservableCollection并由特定的Rx调度器处理来推迟其等待/执行:
var scheduler = new EventLoopScheduler();
Observable
.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
e => _collection.CollectionChanged += e,
e => collection.CollectionChanged -= e)
.Where(e => e.EventArgs.Action == NotifyCollectionChangedAction.Add)
.Select(e => e.EventArgs.NewItems.Cast<Task<T>>())
.ObserveOn(scheduler)
.Do(x => scheduler.Schedule(
() => Thread.Sleep(TimeSpan.FromSeconds(1)))) // once a second
.Subscribe(???);
所以调用代码不会更改:
var result = await Later(task);
var do= new Do();
var result = await do.Later(() => 1 + 2);
public class Do
{
private Subject<Action> _backlog = new Subject<Action>();
public Do()
{
Observable.CombineLatest(_backlog, Observable.Timer(...), (l, r) => l)
.Subscribe(x => x());
}
public Task<T> Later(Func<T> getResult)
{
var tcs = new TaskCompletionSource<T>();
_backlog.OnNext(() => {
try
{
var result = getResult();
tcs.SetResult(result);
}
catch(Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
}
如果您计划过渡到RX,为什么要使用TPL?只需使用下面的内容,并使用您想要的任何调度程序。
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => "SomeResult")
.Subscribe(x => Console.WriteLine("{0} - {1}", DateTime.Now.Second, x));