反应式扩展可观察.FromAsync:如何等到异步操作完成



在我的应用程序中,我从消息总线(兔子mq,但实际上这并不重要(获取事件。此事件的处理需要相当长的时间,并且事件是突发生成的。一次只能处理一个事件。为了实现这一点,我使用 Rx 来序列化事件并异步执行处理,以免阻塞生产者。

下面是(简化的(示例代码:

static void Main(string[] args)
{
var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat()
.Subscribe();
Console.ReadLine();
subscription.Dispose();
// Wait until finished?
Console.WriteLine("Completed");
}
private static async Task Process(CancellationToken cts, long i)
{
Console.WriteLine($"Processing {i} ...");
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine($"Finished processing {i}");
}

示例应用程序正在释放订阅,然后应用程序将终止。但在此示例中,应用程序正在终止,而订阅之前收到的最后一个事件仍在处理中。

问:在释放订阅后等待最后一个事件处理的最佳方法是什么?我仍在尝试围绕异步 Rx 的东西进行思考。我认为有一种相当简单的方法可以做到这一点,我现在只是没有看到。

如果您对适用于此类简单情况的快速简便解决方案感到满意,那么您只需WaitIObservable而不是订阅它。如果您想接收类似订阅的通知,请在最终Wait之前使用Do运算符:

static void Main(string[] args)
{
var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat()
.Do(onNext: x => { }, onError: ex => { }, onCompleted: () => { })
.Wait();
Console.WriteLine("Completed");
}

感谢Theodor的建议,我现在找到了一个适合我的解决方案:

static async Task Main(string[] args)
{
var inputSequence = Observable.Interval(TimeSpan.FromMilliseconds(500));
var terminate = new Subject<Unit>();
var task = Execute(inputSequence, terminate);
Console.ReadLine();
terminate.OnNext(Unit.Default);
await task.ConfigureAwait(false);
Console.WriteLine($"Completed");
Console.ReadLine();
}
private static async Task Process(CancellationToken cts, long i)
{
Console.WriteLine($"Processing {i} ...");
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine($"Finished processing {i}");
}
private static async Task Execute(IObservable<long> input, IObservable<Unit> terminate)
{
await input
.TakeUntil(terminate)
.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat();
}

相关内容

  • 没有找到相关文章

最新更新