在 RxJava 中实现自定义 Subject<T、E>



我有一个PublishSubject接收String s并发射Article s

private PublishSubject<String> articleSubject;
public Observable<Article> newArticleSubject() {
    articleSubject = PublishSubject.create();
    return articleSubject.flatMap(new Func1<String, Observable<Article>>() {
        @Override
        public Observable<Article> call(String articleId) {
            return dataModel.getArticleById(articleId);
        }
    });
}
articleSubject.onNext("1234");

我想创建一个类ArticleSubject扩展Subject<String, Article>封装这种行为。我试过了:

public class ArticleSubject extends Subject<String, Article> {
    private PublishSubject<String> subject;
    protected ArticleSubject(OnSubscribe<Article> articleOnSubscribe, final IMainDataModel dataModel) {
        super(articleOnSubscribe); //<---- ?????
        this.subject = PublishSubject.create();
        this.subject.flatMap(new Func1<String, Observable<Article>>() {
            @Override
            public Observable<Article> call(String s) {
                return dataModel.getArticleById(s);
            }
        });
    }
    @Override
    public boolean hasObservers() {
        return subject.hasObservers();
    }
    @Override
    public void onCompleted() {
        subject.onCompleted();
    }
    @Override
    public void onError(Throwable e) {
        subject.onError(e);
    }
    @Override
    public void onNext(String s) {
        subject.onNext(s);
    }
}

但是,如何处理构造函数中的OnSubscribe呢?我需要维持订阅者和所有的东西吗?有没有办法把它委托给PublishSubject ?

我找到了一个通用的解决方案:

public abstract class SubjectBinding<T, E> implements Observer<T> {
    private PublishSubject<T> origin;
    public SubjectBinding() {
        this.origin = PublishSubject.create();
    }
    @Override
    public void onCompleted() {
        origin.onCompleted();
    }
    @Override
    public void onError(Throwable e) {
        origin.onError(e);
    }
    @Override
    public void onNext(T t) {
        origin.onNext(t);
    }
    public Observable<E> asObservable() {
        return origin.flatMap(new Func1<T, Observable<E>>() {
            @Override
            public Observable<E> call(T t) {
                return asObservable(t);
            }
        });
    }
    protected abstract Observable<E> asObservable(T t);
}

 private class ArticleSubject extends SubjectBinding<String, Article> {
        @Override
        protected Observable<Article> asObservable(String s) {
            return dataModel.getArticleById(s);
        }
    }
现在,我可以发送String值:
articleSubject.onNext("1234");

发出Article s:

public Observable<Article> newArticleSubject() {
    return new ArticleSubject().asObservable();
}

相关内容

  • 没有找到相关文章

最新更新