我有一个PublishSubject
接收String
s并发射Article
s
private PublishSubject<String> articleSubject;
public Observable<Article> newArticleSubject() {
articleSubject = PublishSubject.create();
return articleSubject.flatMap(new Func1<String, Observable<Article>>() {
@Override
public Observable<Article> call(String articleId) {
return dataModel.getArticleById(articleId);
}
});
}
articleSubject.onNext("1234");
我想创建一个类ArticleSubject
扩展Subject<String, Article>
封装这种行为。我试过了:
public class ArticleSubject extends Subject<String, Article> {
private PublishSubject<String> subject;
protected ArticleSubject(OnSubscribe<Article> articleOnSubscribe, final IMainDataModel dataModel) {
super(articleOnSubscribe); //<---- ?????
this.subject = PublishSubject.create();
this.subject.flatMap(new Func1<String, Observable<Article>>() {
@Override
public Observable<Article> call(String s) {
return dataModel.getArticleById(s);
}
});
}
@Override
public boolean hasObservers() {
return subject.hasObservers();
}
@Override
public void onCompleted() {
subject.onCompleted();
}
@Override
public void onError(Throwable e) {
subject.onError(e);
}
@Override
public void onNext(String s) {
subject.onNext(s);
}
}
但是,如何处理构造函数中的OnSubscribe
呢?我需要维持订阅者和所有的东西吗?有没有办法把它委托给PublishSubject
?
我找到了一个通用的解决方案:
public abstract class SubjectBinding<T, E> implements Observer<T> {
private PublishSubject<T> origin;
public SubjectBinding() {
this.origin = PublishSubject.create();
}
@Override
public void onCompleted() {
origin.onCompleted();
}
@Override
public void onError(Throwable e) {
origin.onError(e);
}
@Override
public void onNext(T t) {
origin.onNext(t);
}
public Observable<E> asObservable() {
return origin.flatMap(new Func1<T, Observable<E>>() {
@Override
public Observable<E> call(T t) {
return asObservable(t);
}
});
}
protected abstract Observable<E> asObservable(T t);
}
和
private class ArticleSubject extends SubjectBinding<String, Article> {
@Override
protected Observable<Article> asObservable(String s) {
return dataModel.getArticleById(s);
}
}
现在,我可以发送String
值:
articleSubject.onNext("1234");
发出Article
s:
public Observable<Article> newArticleSubject() {
return new ArticleSubject().asObservable();
}