为什么这个在 Flux.create 中带有块的 reator 代码不起作用?



>我尝试使用watchService作为Flux生成器,但它无法工作,我也尝试了一些简单的块,如Flux.create方法中的Thread.sleep,它可以工作。 我想知道为什么以及这些情况之间有什么区别?

可以工作的代码,

@Test
public void createBlockSleepTest() throws InterruptedException {
Flux.create(sink->{
while (true) {
try {
for(int i=0;i<2;i++)
sink.next(num++);
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Thread.sleep(100000L);
}

无法工作的代码,

@Test
public void createBlockTest() throws IOException, InterruptedException {
WatchService watchService = fileSystem.newWatchService();
Path testPath = fileSystem.getPath("C:/testing");
Files.createDirectories(testPath);
WatchKey register = testPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,StandardWatchEventKinds.ENTRY_MODIFY);
Files.write(testPath.resolve("test1.txt"),"hello".getBytes());
Thread.sleep(5000L);
Flux.create(sink->{
while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Files.write(testPath.resolve("test2.txt"),"hello".getBytes());
Thread.sleep(5000L);
Files.write(testPath.resolve("test3.txt"),"hello".getBytes());
Thread.sleep(10000L);
}

我注意到在反应堆的参考中,在创建方法中有一个阻塞通知。但是为什么Thread.sleep有效?

create不会并行化您的代码,也不会使其异步,甚至 尽管它可以与异步 API 一起使用。如果在createlambda 内阻塞, 您将自己暴露在死锁和类似的副作用中。即使使用subscribeOn, 需要注意的是,长阻塞createlambda(例如无限循环调用(sink.next(t)( 可以锁定管道:由于 循环使它们应该运行的同一线程匮乏。使用subscribeOn(Scheduler, false)变体:requestOnSeparateThread = false将使用Scheduler线程进行create并且仍然通过在原始线程中执行request来让数据流。

谁能解决我的难题?

这可以通过更改来修复

while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
key.reset();
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Thread.sleep(5000L)只会阻塞 5 秒,因此create将在延迟后继续前进,而WatchService#take无限期阻塞,除非注册新的WatchKey(在本例中为新文件(。由于创建文件的代码是在create之后,因此存在死锁情况。

相关内容

  • 没有找到相关文章

最新更新