考虑以下管道:
- 将项目缓冲到包中
- 在线程池线程中观察这些包
- 对这些包执行一些异步处理
如果该过程已经完成,则将源observable设置为complete将导致缓冲区按原样发射当前包。但是,处理部分将在获得最后一个包之前获得完整的事件。
我的想法是等待最后一个包被处理,但由于我在OnNext之前获得了OnComplete,我似乎无法通过ReactiveX机制来完成。
有什么方法可以让OnComplete在最后一个OnNext之后发生吗?
下面是一个模拟这种行为的示例(可用作.NET Fiddle):
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
publications
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
.Subscribe()
;
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(3);
var nextpub = publications.FirstAsync();
obs.OnCompleted();
nextpub.Wait();
这段代码解码起来有点乱。。。我想你只是订阅了两次publications
可观测:
- 一旦明确调用
Subscribe()
- 一旦您通过
FirstAsync()
隐式调用它
如果你按照以下方式重新排列一下。将您第一次订阅出版物的行替换为:
var tcs = new TaskCompletionSource<Unit>();
var nextpub = publications
.Do(x => Console.WriteLine("publications notify {0}", x),
() => Console.WriteLine("publications complete"))
.Subscribe(_ => {}, () => tcs.SetResult(Unit.Default));
删除带有FirstAsync()
的线路,并将对nextpub.Wait()
的调用替换为:
tcs.Task.Wait();
这不是一种推荐的写Rx代码的方法,只是修复代码的最快方法。您通常应该在订阅者中处理您的结果,而不是阻止以完成。例如:
SomeObservable.Subscribe(x => /* handle result */);
在Rx中,重要的是要理解可观察对象的行为契约。您将始终从内置运算符中获取此序列:
OnNext*(OnCompleted|OnError)?
因此,零个或多个(可能是无限个)"OnNext"调用,然后可选地是"OnCompleted"或"OnError"调用。
您将永远不会在"OnNext"之前获得"OnCompleted"-对于单个可观察对象。
现在,您的代码似乎有不同的行为,但事实并非如此。
实际上,您对源observable有两个独立的订阅。
以下是一个订阅的样子:
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
publications
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
.Subscribe(); /* Subscription #1 here! */
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(3);
obs.OnCompleted();
这是另一个:
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
var nextpub = publications.FirstAsync();
obs.OnCompleted();
nextpub.Wait(); /* Subscription #2 here! */
如果我们只看"Subscription#1",并将调度程序更改为Scheduler.Immediate
,我们将得到以下执行顺序:
to buffer 1
to buffer 2
from buffer [1, 2]
to selectmany [1, 2]
> thread: 28
notify ()
publications notify ()
to buffer 3
to buffer complete
from buffer [3]
to selectmany [3]
> thread: 28
notify ()
publications notify ()
from buffer complete
to selectmany complete
complete
publications complete
这看起来仍然像是在值出来之前得到了to buffer complete
。但这是一种误导。.Do(...)
运算符是专门引入的,以允许将副作用引入Rx管道。因此,这可能会让事情看起来不正常,但如果你在你创建的Rx管道中采取每一步,你会看到每一步都完美地遵循"OnNext*(OnCompleted|OnError)"契约。
你真的需要专注于管道中的一个步骤,你会发现这一切都是正确的。