RxJ避免外部状态,但仍然访问以前的值



我正在使用RxJs来侦听amqp queu(实际上并不相关)。

我有一个函数createConnection,它返回一个Observable,它发出新的连接对象。一旦我建立了连接,我想每1000ms发送一次消息,发送10条消息后我想关闭连接。

我试图避免外部状态,但如果我不将连接存储在外部变量中,我如何关闭它?请看,我从连接开始,然后是flatMap和推送消息,所以在几个链之后,我不再有连接对象。

这不是我的流程,但想象一下这样的东西:

createConnection()
  .flatMap(connection => connection.createChannel())
  .flatMap(channel => channel.send(message))
  .do(console.log)
  .subscribe(connection => connection.close()) <--- obviously connection isn't here

现在我明白这样做很愚蠢,但现在我该如何访问连接?我当然可以从var connection = createConnection() 开始

然后以某种方式加入其中。但是我该怎么做呢?我甚至不知道如何正确地问这个问题。Bottomline,我有一个可观察的,它会发出连接,在连接打开后,我想要一个每1000ms发出消息的可观察的(带有take(10)),然后关闭连接

问题的直接答案是"你可以完成每一步"。例如,您可以更换这条线路

.flatMap(connection => connection.createChannel())

这个:

.flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))

并一直保持对连接的访问。

但还有另一种方法可以做你想做的事情。让我们假设你的createConnection和createChannel函数看起来像这样:

function createConnection() {
  return Rx.Observable.create(observer => {
    console.log('creating connection');
    const connection = {
      createChannel: () => createChannel(),
      close: () => console.log('disposing connection')
    };
    observer.onNext(connection);
    return Rx.Disposable.create(() => connection.close());
  });
}
function createChannel() {
  return Rx.Observable.create(observer => {
    const channel = {
      send: x => console.log('sending message: ' + x)
    };
    observer.onNext(channel);
    // assuming no cleanup here, don't need to return disposable
  });
}

createConnection(和createChannel,但我们将关注前者)返回冷可观测;每个订阅者都将获得包含单个连接的自己的连接流,当订阅到期时,将自动调用dispose逻辑。

这允许你做这样的事情:

const subscription = createConnection()
  .flatMap(connection => connection.createChannel())
  .flatMap(channel => Rx.Observable.interval(1000).map(i => ({ channel: channel, data: i })))
  .take(10)
  .subscribe(x => x.channel.send(x.data))
;

您实际上不必为了进行清理而处理订阅;在满足take(10)之后,整个链将结束并触发清理。你需要明确调用dispose订阅的唯一原因是,如果你想在10个1000ms间隔结束之前将其拆除。

请注意,此解决方案还包含对您的问题的直接回答的一个实例:我们将通道向下推,这样我们就可以在传递给订阅调用的onNext lambda中使用它(这通常是此类代码出现的地方)。

整个过程如下:https://jsbin.com/korihe/3/edit?js,控制台,输出

由于平面图等待可观察<(T)>({connection:connection,channel:connection.createChannel()})是一个Object。

.flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))

相反,您可以使用组合最新操作员

.flatMap(connection => Observable.combineLatest( Observable.of(connection), connection.createChannel(), (connection, channel) => { ... code .... });

最新更新