我用RX迈出了第一步。我读了一些关于它的文章,但我认为亲自动手是更好的方法。所以我开始把我现有的代码转换成Rx类型的代码。
目标:我试图模拟一个以特定频率发送数据的源(例如60/s,摄像机或其他)。我有一段录像是用来模拟信号源的而信号源是不可用的。我需要源开始发送,即使没有人在听,因为这是真正的源会做的。
在Rx之前,我去做了一个Runnable,它只是迭代15000个数据项,将项目发送到我的RabbitMQ服务器并休眠1/60s,然后发送下一个。
现在我想把这个逻辑变成一个热门的可观察对象,只是为了玩。到目前为止,我有这个:
Observable.from(mDataItems)
.takeWhile(item -> mRunning)
.map(mGson::toJson)
.doOnNext(json -> {
try {
mChannel.basicPublish(EXCHANGE_NAME, "", null, json.getBytes());
} catch (IOException e) {
Logger.error(e, String.format("Could not publish to %s exchange", EXCHANGE_NAME));
}
try {
Thread.sleep(1 / SENDING_FREQUENCY_IN_HZ);
} catch (InterruptedException e) {
Logger.error(e, String.format("Could not sleep for %d ms", (int) (1000 / SENDING_FREQUENCY_IN_HZ)));
}
})
.doOnCompleted(() -> {
if (mRunning)
Logger.info("All data sent");
else
Logger.info("Interrupted while sending");
disconnect();
mRunning = false;
})
.subscribeOn(Schedulers.io())
.publish()
.connect();
尽管它到目前为止工作,我不知道这是否是一个"好"的方式来创建一个热门的可观察对象(或一般的可观察对象),只是发出项目。(我也不知道我是否应该使用一个主题而不是一个可观察对象,但这是另一个问题)。
是的,还有一个选择:
int delay = 1000 / frequency;
Observable o = Observable.from(dataItems)
.zipWith(
Observable.timer(delay, delay, TimeUnit.MILLISECONDS)
.onBackpressureDrop(),
(s, t) -> s)
.map(mGson::toJson)
// other ops as necessary
.subscribeOn(Schedulers.io())
.publish();
o.connect();
o.subscribe(...);