如何正常终止可观察订阅?



我正在尝试使用反应式扩展(Rx(来处理数据流。但是,每个元素的处理可能需要一些时间。为了中断处理,我正在使用CancellationToken,这有效地停止了订阅。

请求取消后,如何正常完成当前工作并正确终止而不会丢失任何数据?

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing

if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);

Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");

输出

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.

从输出中可以看出,"作业终止"行不是最后一行,这意味着在应用程序终止之前,清理将没有足够的时间完成。

预期输出

0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.

"作业终止"行是要打印的最后一行。"取消"和"清洁"线已被允许花时间。

(编辑:添加了预期输出(

如果我正确理解了这个问题,这不是 Rx 问题,这是一个"无论你在订阅中做什么"问题。您的订阅操作需要半秒,清理的可能性还需要半秒,您的作业终止需要几微秒。您希望在取消和终止之间挤在什么

?我能给你的最好的建议是让订阅操作比Thread.Sleep调用更好地遵守取消令牌。

使用类似问题的答案以及有关在终止之前等待的问题的答案,我想出了一个可以满足我要求的解决方案。

我最初的问题是我找不到等待订阅线程的方法。上面链接的答案引导我以三种方式重构代码:

  1. 我将取消逻辑从订阅转移到可观察的。

  2. 订阅包装在其自己的Task中(因此可以继续执行ReadLine-语句(。

  3. 引入了一个ManualResetEvent来控制应用程序退出策略。

溶液:

var reset = new ManualResetEvent(false);
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250))
.TakeWhile(x => !cts.Token.IsCancellationRequested)
.Finally(
() =>
{
Console.WriteLine("Finally: Beginning finalization.");
Thread.Sleep(500);
Console.WriteLine("Finally: Done with finalization.");
reset.Set();
});
await Task.Factory.StartNew(
() => observable
.Subscribe(
value =>
{
Console.WriteLine("Begin: {0}", value);
Thread.Sleep(2000);
Console.WriteLine("End: {0}", value);
},
() => Console.WriteLine("Completed: Subscription completed.")),
TaskCreationOptions.LongRunning);
Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");

输出:

Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.

作为反应式扩展的新手,我不知道这是否是解决我问题的最佳解决方案。但这是对问题中发布的示例的极大改进,因为它满足了我的要求:

  • 允许每个 OnNext 操作运行到完成。
  • 应用程序等待流处理完成(由ManualResetEvent发出信号(。
  • 流取消逻辑在TakeWhile-方法中移动到生产者(而不是使用者(。
  • 应用程序终止逻辑是对生成者Finally方法中的流取消的反应。

这是一个更好的解决方案。

可观察量是(a(可等待的。无法等待对可观察量的订阅。因此,如果你想等待订阅代码完成,而不诉诸像使用ManualResetEvents 这样的人工解决方案,你应该让你的订阅代码成为派生可观察量的副作用,并且 (a(等待该可观察量。您的问题中提供的示例有其他要求,这使事情变得有点复杂,但不是那么复杂:

  1. 您希望在订阅可观察量和等待其完成(Console.ReadLine()等(之间执行其他操作。

  2. 您希望在取消CancellationToken时终止可观察量。

下面是如何满足这些要求的示例。它仅显示了解决此问题的众多可用方法之一:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
var withCancellation = observable
.TakeUntil(Observable.Create<Unit>(observer =>
cts.Token.Register(() => observer.OnNext(default))));
var withSideEffectsAndCancellation = withCancellation
.Do(value =>
{
Console.WriteLine(value);
Thread.Sleep(500);
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500);
Console.WriteLine("Cleaning up done for {0}.", value);
}
}, () => Console.WriteLine("Completed"));
var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
.Publish()
.AutoConnect(0);
Console.ReadLine();
cts.Cancel();
hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");

解释:

  1. .TakeUntil...cts.Token.Register...是一种惯用方法,可以在取消cts.Token时立即取消订阅可观察Interval。它是从相关问题复制粘贴的。您也可以使用更简单的.TakeWhile(x => !cts.Token.IsCancellationRequested),前提是您可以接受响应稍慢的取消。

  2. Do运算符是执行订阅副作用的自然方法,因为它与Subscribe方法具有相同的参数。

  3. .Publish().AutoConnect(0);使序列立即变热。AutoConnect运算符不提供与基础可观察值断开连接的方法(与RefCount运算符相反(,但在这种特殊情况下,不需要断开连接功能。基础可观察对象的生存期已由我们之前附加的CancellationToken控制。

  4. .Wait()之前的.DefaultIfEmpty()是必需的,以防止在生成任何元素之前取消序列的边缘情况下InvalidOperationException。如果异步await序列,也需要它。这些等待可观察量的机制(以及其他机制,如RunAsyncToTask运算符(返回可观察量发出的最后一个值,当不存在这样的值时,它们会感到沮丧。

相关内容

  • 没有找到相关文章

最新更新