使用RxScala进行响应式编程



我有一个通过Socket协议连接到服务的Observable。到套接字的连接通过客户端库进行。我使用的客户端库有java.util.Observer with,我可以注册被推入其中的事件

final class MyObservable extends Observable[MyEvent] {
  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

我有两个悬而未决的问题我不明白。

如何在订阅服务器中获取步骤:3的结果?

每次当我获得MyEvent时,像下面这样的订阅者,我看到有一个新的连接正在创建。最后,对每个传入事件运行步骤1、步骤2和步骤3。

val myObservable = new MyObservale()
myObservable.subscribe()

除非我误解了你的问题,否则你就叫onNext:

def subscribe(subscriber: Subscriber[MyEvent]) = {
  // connect to the Socket (Step: 1)
  // get the responses that are pushed (Step: 2)
  // transform them into MyEvent type (Step: 3)
  // finally notify the subscriber:
  subscriber.onNext(myEventFromStep3)
}

和订阅的代码会做这样的事情:

myObservable.subscribe(onNext = println(_))

相关内容

  • 没有找到相关文章

最新更新