使用RxJava Flowable -如何根据事件的类型拆分发射



假设我们使用Firebase ChildEventListener,它可以作为多个数据源(它的函数),我用Flowable或Observable包装它。我希望在每个源中,发射器将数据发送到不同的管道,因为在每种情况下数据都可以更改,我希望以不同的方式处理它-即,根据事件的类型将发射分成几个不同的流。

如何在Java中实现?

public void newUsers() {
DatabaseReference ref = database.getReference().child("Users");
Flowable.create(emitter -> {
ref.addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
User userData = dataSnapshot.getValue(User.class);
emitter.onNext(userData);
}
@Override
public void onChildChanged(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
User userData = dataSnapshot.getValue(User.class);
emitter.onNext(userData);
}
@Override
public void onChildRemoved(@NonNull DataSnapshot dataSnapshot) {
User userData = dataSnapshot.getValue(User.class);
emitter.onNext(userData);
}
@Override
public void onChildMoved(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
emitter.onNext(userData);
}
@Override
public void onCancelled(@NonNull DatabaseError databaseError) {
User userData = dataSnapshot.getValue(User.class);
emitter.onNext(userData);
}
});
}
}

所以,有多个流,我们不能有一个单一的Flowable。我不知道Flowable是否真的是一个要求,所以我只是概述我的想法,你可以随意修改它并适应你的需求。

BehaviourSubject childAddedStream;
BehaviourSubject childChangedStream;
BehaviourSubject childRemovedStream;
BehaviourSubject childMovedStream;
BehaviourSubject cancelledStream;
public void newUsers() {
DatabaseReference ref = database.getReference().child("Users");

ref.addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
User userData = dataSnapshot.getValue(User.class);
childAddedStream.onNext(userData);
}
@Override
public void onChildChanged(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
User userData = dataSnapshot.getValue(User.class);
childChangedStream.onNext(userData);
}
@Override
public void onChildRemoved(@NonNull DataSnapshot dataSnapshot) {
User userData = dataSnapshot.getValue(User.class);
childRemovedStream.onNext(userData);
}
@Override
public void onChildMoved(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
childMovedStream.onNext(userData);
}
@Override
public void onCancelled(@NonNull DatabaseError databaseError) {
User userData = dataSnapshot.getValue(User.class);
cancelledStream.onNext(userData);
}
});
}
}

这样的东西将是我的起点。

这个想法是让这个类为每个回调初始化不同的主题,当它被创建时注册回调到firebaseddatabase,并将单个回调路由到不同的流(主题,如果你想的话,可以随意使用一个更相关的主题,即BehaviourSubject) -然后谁需要监听只是监听相关的流,而不是单一的Flowable,它只是充当回调和响应世界之间的桥梁。

最新更新