RxJava2,2 个订阅者用于可观察/可流动,但在下一个订阅者上被调用任何一个



rxjava2 version 2.1.5

试图理解一个可观察量的 RxJava2 多个订阅。有一个简单的文件监视服务,跟踪目录中文件的创建,修改,删除。我添加了 2 个订阅者,并希望在两个订阅者上打印事件。当我将文件复制到监视目录中时,我看到一个订阅者打印出事件。然后,当我删除文件时,我看到第二个订阅者打印出事件。我期待两个订阅者都打印事件。我在这里错过了什么?

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class MyRxJava2DirWatcher {
    public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {
        return Flowable.create(subscriber -> {
            boolean error = false;
            WatchKey key;
            try {
                key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
            }
            catch (IOException e) {
                subscriber.onError(e);
                error = true;
            }
            while (!error) {
                key = watcher.take();
                for (final WatchEvent<?> event : key.pollEvents()) {
                    subscriber.onNext(event);
                }
                key.reset();
            }
        }, BackpressureStrategy.BUFFER);
    }
    public static void main(String[] args) throws IOException, InterruptedException {
        Path path = Paths.get("c:\temp\delete");
        final FileSystem fileSystem = path.getFileSystem();
        WatchService watcher = fileSystem.newWatchService();
        MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
        my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());
        }, onError -> {
            System.out.println("1>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });
        // MyRxJava2DirWatcher my2 = new MyRxJava2DirWatcher();
        my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());
        }, onError -> {
            System.out.println("2>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });
        TimeUnit.MINUTES.sleep(1000);
    }
}

输出如下所示

2>>Event kind:ENTRY_CREATE. File affected: 1.txt. RxCachedThreadScheduler-2
2>>Event kind:ENTRY_MODIFY. File affected: 1.txt. RxCachedThreadScheduler-2
1>>Event kind:ENTRY_DELETE. File affected: 1.txt. RxCachedThreadScheduler-1

发生的情况是,你们在两个Flowable之间共享相同的WatchService,并且它们在其中争夺事件。如果改为传入FileSystem并在 Flowable.create 中调用 newWatchService(),则应接收所有事件的次数与Subscriber次数一样多:

public Flowable<WatchEvent<?>> createFlowable(FileSystem fs, Path path) {
    return Flowable.create(subscriber -> {
        WatchService watcher = fs.newWatchService();
        subscriber.setCancellable(() -> watcher.close());
        boolean error = false;
        WatchKey key;
        try {
            key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        }
        catch (IOException e) {
            subscriber.onError(e);
            error = true;
        }
        while (!error) {
            key = watcher.take();
            for (final WatchEvent<?> event : key.pollEvents()) {
                subscriber.onNext(event);
            }
            key.reset();
        }
    }, BackpressureStrategy.BUFFER);
}

另请注意,您应该使用 subscribeOn(Schedulers.computation(), false) 以避免Subscriberpoll死锁。

您正在为两个不同的订阅者创建两个不同的 Flowable。让它是一个 Flowable 被订阅两次,如下所示。

public static void main(String[] args) throws IOException, InterruptedException {
        Path path = Paths.get("c:\temp\delete");
        final FileSystem fileSystem = path.getFileSystem();
        WatchService watcher = fileSystem.newWatchService();
        MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
        Flowable myFlowable = my.createFlowable(watcher, path);
        myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());
        }, onError -> {
            System.out.println("1>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });
        myFlowable.subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());
        }, onError -> {
            System.out.println("2>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });
        TimeUnit.MINUTES.sleep(1000);
    }
}

相关内容

  • 没有找到相关文章

最新更新