如何通过Kafka Stream应用程序提高从Kafka并向前转到Kafka的性能



我的kafka流应用程序具有1.0.0 kafka流api。我有单个经纪人0.10.2.0 KAFKA和单个分区的单个主题。所有可配置的参数均相同,除了producter request.timeout.ms。我配置了Producer request.timeout.ms,5分钟以修复Kafka Streams程序在生产问题时会引发异常。

在我的流应用程序中,我阅读了Kafka的事件,对其进行处理并转发到同一Kafka的另一个主题。

计算统计数据后,我观察到处理时间为5%,剩余时间剩下95%的时间用于阅读&写作。

即使我在卡夫卡(Kafka)有数千万个事件,但有时卡夫卡(Kafka)的民意调查正在返回单位唱片,而卡夫卡(Kafka)民意调查正在返回数千张记录。

某些时候上下文向前介绍了更多时间将更少的记录发送到Kafka,而某个时候上下文则花费更少的时间将更多记录发送到Kafka。

我试图通过增加max.poll.records,poll.ms值来提高阅读性能。但是没有运气。

在阅读和转发时如何提高性能?Kafka的民意调查和前进将如何工作?哪些参数有助于提高性能?

以下是我的应用程序中很少有重要的生产者配置参数。

acks = 1
batch.size = 16384
buffer.memory = 33554432
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 240000
retries = 10
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
transaction.timeout.ms = 60000
transactional.id = null

以下是我的应用程序中很少有重要的消费者配置参数:

auto.commit.interval.ms = 5000
auto.offset.reset = earliest
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
internal.leave.group.on.close = false
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 10000
metadata.max.age.ms = 300000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000

以下是我的应用程序中很少有重要的流配置参数:

application.server =
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
key.serde = null
metadata.max.age.ms = 300000
num.standby.replicas = 0
num.stream.threads = 1
poll.ms = 1000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =

您可以通过控制键并增加主题分区的数量来引入操作中的并行性。

以上将增加Kafka流的数量以类似于处理。可以通过增加Kafka Streams应用程序的线程数来处理

您可以在不同的线程中创建多个Kafka消费者,并将其分配给同一消费者组。他们将并行消耗消息,不会丢失消息。

您如何发送消息?使用Kafka,您可以以火灾方式发送消息:它可以改善吞吐量。

producer.send(record);

ACKS参数控制在生产者可以将写入成功之前,必须收到多少个分区复制品。

如果您设置 ack = 0 生产者将不等待经纪人的答复,然后才假设消息已成功发送。但是,由于生产商不等待服务器的任何响应,因此它可以像网络支持一样快地发送消息,因此可以使用此设置来实现很高的吞吐量。

最新更新