我正在努力熟悉RxJava。以下是我想要实现的用例:
我的屏幕上有一个按钮,我正在收集点击的次数。因此,如果用户点击按钮,就会记录一次点击并生成日志。现在,如果用户单击按钮两次,那么它将记录两次单击,收集它们并输出2而不是1。
从本质上讲,我试图在一段时间内积累点击次数,然后得出最终结果。我猜"缓冲"是我需要使用的方法。我在Android上快速地举了一个例子(下面是代码),但是buffer方法似乎并不像收集所有事件输入并输出一个集合那么简单。
public class DemoFragment
extends Fragment {
private int _tapCount = 0;
private Observable<List<Integer>> _bufferedObservable;
private Observer<List<Integer>> _observer;
@Override
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
super.onActivityCreated(savedInstanceState);
_setupLogger();
_bufferedObservable = _getBufferedObservable();
_observer = _getObserver();
}
// the library butterknife allows this
@OnClick(R.id.btn_start_operation)
public void onButtonTapped() {
_log("GOT A TAP");
_bufferedObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(_observer);
}
private Observable<List<Integer>> _getBufferedObservable() {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1); // send one tap here
}
}).buffer(2, TimeUnit.SECONDS); // collect all taps in the last 2s
}
private Observer<List<Integer>> _getObserver() {
return new Observer<List<Integer>>() {
@Override
public void onCompleted() {
_log(String.format("%d taps", _tapCount));
_tapCount = 0; // reset tap count
}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(List<Integer> integers) {
if (integers.size() > 0) {
for (int i : integers) {
_tapCount += i;
}
onCompleted();
} else {
_log("No taps received");
}
}
};
}
// ... other method that help wiring up the example (irrelevant to RxJava)
}
谁能帮我理解一下我在这里理解上的误解?问题1:我期待_getObserver()
的onNext
发送给我一个带有累计点击数的列表。因此,如果按钮被击中5次,那么我期待一个包含5个项目的列表,每个项目的值为"1"。与现有的代码,我总是得到一个空列表。
问题2:如果通过检查List<Integer> integers
大小没有收到事件,我基本上会做控制台日志。如果列表不是空的,我抛出一个控制台日志,说"没有收到水龙头"。可见对象似乎永远不会停止。它几乎就像一个计时器,即使在没有注册按钮点击的情况下,它也会一直持续下去。有没有办法停止可观察对象,如果在过去的10秒内没有注册事件?
问题3:排放的数量似乎几乎呈指数增长。这几乎就像它收集了以前所有的按钮空点击。
这里是一个代码显示我将如何做到这一点(假设您的按钮的id是R.id.rx_button
):
private Subscription mSubscription;
@Override
protected void onResume() {
super.onResume();
mSubscription = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
findViewById(R.id.rx_button).setOnClickListener(new OnClickListener() {
@Override
public void onClick(View v) {
subscriber.onNext(1);
}
});
}
}).buffer(2, TimeUnit.SECONDS)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
Log.i("TAG", String.valueOf(integers.size()));
}
});
}
@Override
protected void onPause() {
super.onPause();
mSubscription.unsubscribe();
}
简单地说,在call
方法上关闭OnClickListener
实现,这样您就可以在其中使用subscriber
对象。
onResume
使用lambda会看起来更好(看看Retrolambda项目):
@Override
protected void onResume() {
super.onResume();
mSubscription = Observable.create((Subscriber<? super Integer> subscriber) ->
findViewById(R.id.rx_button).setOnClickListener(view ->
subscriber.onNext(1))).buffer(2, TimeUnit.SECONDS)
.subscribe(integers -> Log.i("TAG", String.valueOf(integers.size())));
}
我认为最好使用很棒的rx-binding库。我认为解决方案更简洁,尤其是用lambda。我使用了过滤器操作符,所以我只在onNext()中得到结果,其中用户在给定的2秒内按了5次以上。
我基于一个来自(RxJava-Android-Samples)的例子,它使用了一个旧的rx绑定依赖。
@Override
public void onResume() {
super.onResume();
mClickSubscription = getBufferedSubscription();
}
@Override
public void onPause() {
super.onPause();
mClickSubscription.unsubscribe();
}
private Subscription getBufferedSubscription() {
return RxView.clicks(rx_button)
.map(aVoid -> 1)
.buffer(2, TimeUnit.SECONDS)
.filter(integers -> integers.size() > 5)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
// fyi: you'll never reach here
}
@Override
public void onError(Throwable e) {
SLog.i("Dang error! check your logs");
}
@Override
public void onNext(List<Integer> integers) {
SLog.i(String.format("%d taps", integers.size()));
}
});
}
试试下面的代码:
public class BufferExampleActivity extends AppCompatActivity {
private static final String TAG = BufferExampleActivity.class.getSimpleName();
Button btn;
TextView textView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_example);
btn = (Button) findViewById(R.id.btn);
textView = (TextView) findViewById(R.id.textView);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
doSomeWork();
}
});
}
/*
* simple example using buffer operator - bundles all emitted values into a list
*/
private void doSomeWork() {
Observable<List<String>> buffered = getObservable().buffer(3, 1);
// 3 means, it takes max of three from its start index and create list
// 1 means, it jumps one step every time
// so the it gives the following list
// 1 - one, two, three
// 2 - two, three, four
// 3 - three, four, five
// 4 - four, five
// 5 - five
buffered.subscribe(getObserver());
}
private Observable<String> getObservable() {
return Observable.just("one", "two", "three", "four", "five");
}
private Observer<List<String>> getObserver() {
return new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(List<String> stringList) {
textView.append(" onNext size : " + stringList.size());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : size :" + stringList.size());
for (String value : stringList) {
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " : value :" + value);
}
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
}