Scala中具有未来的生产者-消费者



假设我有一个"生产者-消费者"问题:生产者向消费者发送消息,消费者使用Scala Futures异步处理消息:例如future { /* do the processing */ }

假设生产者每秒产生100条消息,而消费者每秒只处理10条消息,会发生什么?我想会有内存泄漏。将会有很多Future对象,线程池的内部消息队列也会增长。这有道理吗?

最好的处理方法是什么?

在akka中,使用了执行上下文,但似乎没有邮箱-阅读源代码是值得的,但我可以通过实验来回答您的问题:

Future没有"邮箱",我也不能100%确定Akka在后台做了什么,或者执行上下文实际上包含了什么,但我们可以看到,当直接使用Future时,Akka会耗尽内存:

scala> import scala.concurrent.Future
import scala.concurrent.Future
scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global                  ^
scala> while(1==1) Future(Thread.sleep(100))
java.lang.OutOfMemoryError: Java heap space

如果我们谈论的是消息,那么有一个邮箱描述了参与者消息队列的行为(一次只处理一条消息,就会填充)——我将在下面对此进行解释。

假设有一个有限制的邮箱(如有大小限制的邮箱),消息会发生什么。答案是取决于邮箱。首先,受限邮箱有一些设置,例如大小限制:

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1000
  mailbox-push-timeout-time = 10s
}

现在,当达到该限制时,akka将根据邮箱的配置方式丢弃旧邮件或新邮件,例如使用此设置

# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on

显然,如果存在其他资源问题,如内存不足,那么应用程序可能会崩溃,这意味着消息在存储在内存中时会丢失。无限制邮箱将继续堆叠邮件,直到出现错误,这就是您可能希望使用有限制邮箱的原因。

如果在错误情况下丢失邮件是不可取的,那么还有另一个选项=可以使用持久邮箱,将邮件存储在更持久的地方,例如文件中。下面是一个示例邮箱配置,它使用文件来实现更持久的邮件存储。

akka {
  actor {
    mailbox {
      file-based {
        # directory below which this queue resides
        directory-path = "./_mb"
        # attempting to add an item after the queue reaches this size (in items)
        # will fail.
        max-items = 2147483647
        # attempting to add an item after the queue reaches this size (in bytes)
        # will fail.
        max-size = 2147483647 bytes
        # attempting to add an item larger than this size (in bytes) will fail.
        max-item-size = 2147483647 bytes
        # maximum expiration time for this queue (seconds).
        max-age = 0s
        # maximum journal size before the journal should be rotated.
        max-journal-size = 16 MiB
        # maximum size of a queue before it drops into read-behind mode.
        max-memory-size = 128 MiB
        # maximum overflow (multiplier) of a journal file before we re-create it.
        max-journal-overflow = 10
        # absolute maximum size of a journal file until we rebuild it,
        # no matter what.
        max-journal-size-absolute = 9223372036854775807 bytes
        # whether to drop older items (instead of newer) when the queue is full
        discard-old-when-full = on
        # whether to keep a journal file at all
        keep-journal = on
        # whether to sync the journal after each transaction
        sync-journal = off
        # circuit breaker configuration
        circuit-breaker {
          # maximum number of failures before opening breaker
          max-failures = 3
          # duration of time beyond which a call is assumed to be timed out and
          # considered a failure
          call-timeout = 3 seconds
          # duration of time to wait until attempting to reset the breaker during
          # which all calls fail-fast
          reset-timeout = 30 seconds
        }
      }
    }
  }
}

您可以设置最大队列大小。事实上,我认为Akka演员默认情况下有有限的队列,尽管我在这里可能错了。

这并不能真正解决问题,但最终,如果你没有足够的后端参与者来进行处理,你将无法处理所有内容。

我喜欢Netflix的做法:所有请求都通过代理来监控后端的运行状况。如果后端花费的时间太长,他们会丢弃请求并提供回退:要么是合理的默认值,要么是错误消息。他们谈论了很多关于他们的体系结构的内容,例如参见本演示。

有多个消费者-使用参与者池。你可以根据池水上的压力动态调整大小。看见http://doc.akka.io/docs/akka/snapshot/scala/routing.html

最新更新