假设我有一个"生产者-消费者"问题:生产者向消费者发送消息,消费者使用Scala Futures
异步处理消息:例如future { /* do the processing */ }
。
假设生产者每秒产生100条消息,而消费者每秒只处理10条消息,会发生什么?我想会有内存泄漏。将会有很多Future
对象,线程池的内部消息队列也会增长。这有道理吗?
最好的处理方法是什么?
在akka中,使用了执行上下文,但似乎没有邮箱-阅读源代码是值得的,但我可以通过实验来回答您的问题:
Future没有"邮箱",我也不能100%确定Akka在后台做了什么,或者执行上下文实际上包含了什么,但我们可以看到,当直接使用Future时,Akka会耗尽内存:
scala> import scala.concurrent.Future
import scala.concurrent.Future
scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global ^
scala> while(1==1) Future(Thread.sleep(100))
java.lang.OutOfMemoryError: Java heap space
如果我们谈论的是消息,那么有一个邮箱描述了参与者消息队列的行为(一次只处理一条消息,就会填充)——我将在下面对此进行解释。
假设有一个有限制的邮箱(如有大小限制的邮箱),消息会发生什么。答案是取决于邮箱。首先,受限邮箱有一些设置,例如大小限制:
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}
现在,当达到该限制时,akka将根据邮箱的配置方式丢弃旧邮件或新邮件,例如使用此设置
# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on
显然,如果存在其他资源问题,如内存不足,那么应用程序可能会崩溃,这意味着消息在存储在内存中时会丢失。无限制邮箱将继续堆叠邮件,直到出现错误,这就是您可能希望使用有限制邮箱的原因。
如果在错误情况下丢失邮件是不可取的,那么还有另一个选项=可以使用持久邮箱,将邮件存储在更持久的地方,例如文件中。下面是一个示例邮箱配置,它使用文件来实现更持久的邮件存储。
akka {
actor {
mailbox {
file-based {
# directory below which this queue resides
directory-path = "./_mb"
# attempting to add an item after the queue reaches this size (in items)
# will fail.
max-items = 2147483647
# attempting to add an item after the queue reaches this size (in bytes)
# will fail.
max-size = 2147483647 bytes
# attempting to add an item larger than this size (in bytes) will fail.
max-item-size = 2147483647 bytes
# maximum expiration time for this queue (seconds).
max-age = 0s
# maximum journal size before the journal should be rotated.
max-journal-size = 16 MiB
# maximum size of a queue before it drops into read-behind mode.
max-memory-size = 128 MiB
# maximum overflow (multiplier) of a journal file before we re-create it.
max-journal-overflow = 10
# absolute maximum size of a journal file until we rebuild it,
# no matter what.
max-journal-size-absolute = 9223372036854775807 bytes
# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on
# whether to keep a journal file at all
keep-journal = on
# whether to sync the journal after each transaction
sync-journal = off
# circuit breaker configuration
circuit-breaker {
# maximum number of failures before opening breaker
max-failures = 3
# duration of time beyond which a call is assumed to be timed out and
# considered a failure
call-timeout = 3 seconds
# duration of time to wait until attempting to reset the breaker during
# which all calls fail-fast
reset-timeout = 30 seconds
}
}
}
}
}
您可以设置最大队列大小。事实上,我认为Akka演员默认情况下有有限的队列,尽管我在这里可能错了。
这并不能真正解决问题,但最终,如果你没有足够的后端参与者来进行处理,你将无法处理所有内容。
我喜欢Netflix的做法:所有请求都通过代理来监控后端的运行状况。如果后端花费的时间太长,他们会丢弃请求并提供回退:要么是合理的默认值,要么是错误消息。他们谈论了很多关于他们的体系结构的内容,例如参见本演示。
有多个消费者-使用参与者池。你可以根据池水上的压力动态调整大小。看见http://doc.akka.io/docs/akka/snapshot/scala/routing.html