消息处理节流/背压



我有消息的来源,它是Observable。对于每条消息,我都想进行一个HTTP调用,该调用将产生另一个Observable,所以我将它们与flatMap组合在一起,然后将它们汇给某个订阅者。这里是这个场景的代码:

Rx.Observable.interval(1000)
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.subscribe (result) ->
  console.info "Processed: ", result

这个例子是用coffescript编写的,但我认为问题语句对任何其他Rx实现都是有效的。

这种方法的问题是loadMessages生成大量消息的速度非常快。这意味着,我在很短的时间内发出了很多HTTP请求。在我的情况下,这是不可接受的,所以我想将并行HTTP请求的数量限制在10个左右。换句话说,当我发出HTTP请求时,我想节流管道或应用某种反向结果。

Rx是否有任何标准方法或最佳实践来处理此类情况

目前,我实现了非常简单(而且非常次优(的反向结果机制,如果系统在处理中有太多按摩,它会忽略勾选。它看起来像这样(简化版(:

Rx.Observable.interval(1000)
.filter (tick) ->
  stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
  stats.messageIn()
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.do (tick) ->
  stats.messageOut()
.subscribe (result) ->
  console.info "Processed: ", result

不过,我不确定这是否可以做得更好,或者Rx已经有了一些机制来处理这种需求。

这不是严格意义上的背压,这只是限制并发性。这里有一个简单的方法(忽略我可能错误的语法,通过TextArea编码(:

Rx.Observable.interval(1000)
    .flatMap (tick) ->
        // returns an `Observable`
        loadMessages()
    .map (message) ->
        // also returns and `Observable`, but only when
        // someone first subscribes to it
        Rx.Observable.defer ->
            makeHttpRequest(message)
    .merge 10 // at a time
    .subscribe (result) ->
        console.info "Processed: ", result

在C#中,等效的思想是,它不是SelectMany,而是Select(Defer(x)).Merge(n)Merge(int)最多订阅n飞行中的Observables,并缓冲其余部分直到稍后。我们之所以有Defer,是为了让它在Merge(n)订阅我们之前不做任何工作。

在RXJS中,您可以使用背压子模块

http://rxjs.codeplex.com/SourceControl/latest#src/core/backpressure/

disclaimer我从未使用过JS的RX版本,但你确实要求提供一种实现背压的标准方法,核心库似乎支持它。RX c#还没有这种支持。不知道为什么。

听起来你想从队列中提取,而不是推送http请求。Rx真的是正确的技术选择吗?

编辑:

一般来说,我不会使用Rx设计一个解决方案,因为我对源事件有完全的命令式控制。这不是一个被动的场景。

Rxjs中的背压模块显然是为了处理您不拥有源流的情况而编写的。给你。

TPL数据流听起来更适合这里。

如果你必须使用RX,你可以设置这样的循环:如果你想限制X个并发事件,请设置一个Subject作为你的消息源,并强制向其中推送(OnNext(X条消息。在你的订阅服务器中,你可以在OnNext处理程序的每次迭代中向主题推送一条新消息,直到消息源用完。这保证了飞行中最多有X条消息。

相关内容

  • 没有找到相关文章

最新更新