subscribeOn(Schedulers.parallel()) is not working



我正在学习反应堆堆芯并遵循以下内容https://www.baeldung.com/reactor-core

ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(arrList::add);
System.out.println("After: " + arrList);

当我执行上面的代码行时,发出。

Before: []
[DEBUG] (main) Using Console logging
After: []

上面的代码行应该在另一个线程中开始执行,但它根本不起作用。有人能帮我吗?

如反应堆文件中提到的各种subscribe方法:

请记住,由于序列可以是异步的立即将控制权返回给调用线程。这可以给在主线程中执行时未调用使用者的印象或者例如单元测试。

这意味着主方法已经结束,因此主线程在任何线程能够订阅Reactive链之前退出,正如Piotr所提到的。

您要做的是等待整个链完成后再打印数组的内容。

做这件事的天真方法是:

ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.blockLast();
System.out.println("After: " + arrList);

在这里,您可以阻止主线程上的执行,直到Flux上的最后一个元素得到处理。因此,在ArrayList完全填充之前,最后一个System.out不会执行。

请记住,代码在控制台应用程序中运行的方式与在Netty这样的服务器环境中运行的方法有点不同。使Console应用程序等待所有订阅生效的唯一方法是block

但是在并行线程上不允许阻塞。因此,这种方法在Netty环境中不起作用。在那里,您的服务器将一直运行,直到明确关闭,因此subscribe就可以了。

但是,在上面的代码片段中,您进行阻塞不仅是为了防止应用程序退出,也是为了在读取已填充的数据之前等待。

对上述代码的改进如下:

ArrayList<Integer> arrList = new ArrayList<>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.doOnNext(arrList::add)
.doOnComplete(() -> System.out.println("After: " + arrList))
.blockLast();

即使在这里,doOnComplete也可以访问来自反应链之外的数据。为了防止这种情况,你可以在链本身收集通量的元素,如下所示:

System.out.println("Before.");
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.collectList()
.doOnSuccess(list -> System.out.println("After: " + list))
.block();

再次记住,当在Netty中运行时(比如SpringWebflux应用程序),上面的代码将以subscribe()结束。

不过,请注意,从Flux切换到List(或任何Collection)意味着您正在从被动范式切换到命令式编程。您应该能够实现Reactive范式本身中的任何功能。

我觉得有些混乱。当您呼叫subscribeOn(Schedulers.parallel())时。您指定要在不同线程上接收项目。此外,您还必须放慢代码的速度,以便订阅cen真正生效(这就是我添加Thread.sleep(100)的原因)。如果你运行我传递的代码,它就会工作。你们看,反应堆里并没有神奇的同步机制。

ArrayList<Integer> arrList = new ArrayList<Integer>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(
t -> {
System.out.println(t + " thread id: " + Thread.currentThread().getId());
arrList.add(t);
}
);
System.out.println("size of arrList(before the wait): " + arrList.size());
System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
Thread.sleep(100);
System.out.println("size of arrList(after the wait): " + arrList.size());

如果您想将您的项目添加到并行reactor中的列表中,这不是一个好的选择。最好在java8中使用并行流。

List<Integer> collect = Stream.of(1, 2, 3, 4)
.parallel()
.map(i -> i * 2)
.collect(Collectors.toList());

当涉及到并发部分时,您发布的教程不是很精确。值得称赞的是,作者说还有更多的文章要发表。但我认为这根本不应该发布那个特定的例子,因为它会造成混乱。我建议不要那么信任互联网上的资源:)

最新更新