如果观察者正在使用observe_on(rxcpp::observe_on_new_thread()):
,那么等待所有观察者on_completed被调用的正确方法是什么?例如:{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
.subscribe([&](int) { slow_function(foo); }));
auto lifetime = rxcpp::composite_subscription();
lifetime.add([&](){ wrapper.log("unsubscribe"); });
auto s2 = values.ref_count().as_blocking().subscribe(lifetime);
// hope to call something here to wait for the completion of
// s1's on_completed function
}
// the program usually crashes here when foo goes out of scope because
// the slow_function(foo) is still working on foo. I also noticed that
// s1's on_completed never got called.
我的问题是如何等待,直到s1的on_completed完成,而不必设置和轮询一些变量。
使用observe_on()的动机是因为通常值上有多个观察者,我希望每个观察者并发运行。也许有不同的方法可以达到同样的目标,我愿意接受你所有的建议。
合并两个订阅将允许一个阻塞订阅等待两个订阅完成。
{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
s.on_next(2);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto work = values.
observe_on(rxcpp::observe_on_new_thread()).
tap([&](int c) {
slow_function(foo);
}).
finally([](){printf("s1 completedn");}).
as_dynamic();
auto start = values.
ref_count().
finally([](){printf("s2 completedn");}).
as_dynamic();
// wait for all to finish
rxcpp::observable<>::from(work, start).
merge(rxcpp::observe_on_new_thread()).
as_blocking().subscribe();
}
有几点。
流必须返回相同的类型才能进行merge。如果组合不同类型的流,请使用combine_latest。
observable<>::from()中可观察对象的顺序很重要,开始流有ref_count,所以它必须最后调用,以便下面的合并将在启动生成器之前订阅工作。
合并有两个线程调用它。这需要使用线程安全的协调。RXCPP是付费使用的。默认情况下,操作符假定所有调用都来自同一个线程。任何从多个线程获得调用的操作符都需要被赋予一个线程安全的协调,该操作符使用该协调来强制执行线程安全的状态管理和输出调用。