如何在 akka-stream-kafka 中从配置(application.conf)创建生产者设置?



我正在尝试学习和使用akka-stream-kafka,并且正在浏览其[文档][1]。在"生产者设置"部分中,它告诉我们可以使用编程方式和配置创建ProducerSettings。有一个程序化构造的示例,但没有如何通过配置创建它的示例。程序化构造很简单,下面是一个示例。但是,我想使用配置基础结构,并希望配置来自application.conf因为它会给我更多的控制权。我似乎在谷歌上找不到它的例子。

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

文档只是将您转发到 Apache Kafka Javadoc for ProducerConfig,因为它包含一堆常量,您可以在akka.kafka.producer.kafka-clientsconfig 部分中用作键。

从文档中扩展参考配置,一个例子是:

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
# How long to wait for `KafkaProducer.close`
close-timeout = 60s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "localhost:9092"
enable.auto.commit = true
auto.commit.interval.ms = 10000
acks = "all"
retries = 0
batch.size = 16384
}
}

默认情况下,application.conf文件的内容将由您的ActorSystem加载,因此每当您按如下所示创建ProducerSettings对象时,它都应该从akka.kafka.producer中获取配置。无需将配置显式传递给构造函数。

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)

最新更新