如何将任务<T>放入 ObservableCollection 并使用 EventLoopScheduler 进行处理?



目前我有一个任务由TPL调度器处理:

var result = await task;

是否可以通过将其放入ObservableCollection并由特定的Rx调度器处理来推迟其等待/执行:

var scheduler = new EventLoopScheduler();
Observable
    .FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
        e => _collection.CollectionChanged += e,
        e => collection.CollectionChanged -= e)
    .Where(e => e.EventArgs.Action == NotifyCollectionChangedAction.Add)
    .Select(e => e.EventArgs.NewItems.Cast<Task<T>>())
    .ObserveOn(scheduler)
    .Do(x => scheduler.Schedule(
        () => Thread.Sleep(TimeSpan.FromSeconds(1)))) // once a second
    .Subscribe(???);

所以调用代码不会更改:

var result = await Later(task);
var do= new Do();

var result = await do.Later(() => 1 + 2);

public class Do
{
    private Subject<Action> _backlog = new Subject<Action>();
    public Do()
    {
         Observable.CombineLatest(_backlog, Observable.Timer(...), (l, r) => l)
              .Subscribe(x => x());
    }
    public Task<T> Later(Func<T> getResult)
    {
        var tcs = new TaskCompletionSource<T>();
        _backlog.OnNext(() => {
             try
             {
                 var result = getResult();
                 tcs.SetResult(result);
             }
             catch(Exception ex)
             {
                 tcs.SetException(ex);
             }
        });
        return tcs.Task;
    }
}

如果您计划过渡到RX,为什么要使用TPL?只需使用下面的内容,并使用您想要的任何调度程序。

Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => "SomeResult")
            .Subscribe(x => Console.WriteLine("{0} - {1}", DateTime.Now.Second, x));

相关内容

  • 没有找到相关文章

最新更新