在我目前的Kafka版本是2.6,我正在使用流API,我有一个问题。当我启动一个流,它写流,管理,消费者和生产配置。我注意到一些奇怪的事情,虽然我提供配置
streamsConfiguration.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
和上面一样,我在消费者和流日志中看到了一些不同的策略。
这里是显示消费者配置的消费者日志
2021-01-20 15:52:32.611 INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 500
auto.offset.reset = none
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-restore-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
但我也看到了下面的日志
2021-01-20 15:52:32.740 INFO 111980 --- [alytics.event-4] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = false
auto.commit.interval.ms = 500
auto.offset.reset = latest
bootstrap.servers = [XXX:9092, XXX:9092, XXX:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = APPID-dd747646-8b51-42b0-8ad9-2fb26435a588-StreamThread-2-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 25000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = DEBUG
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
当我检查这两个消费者日志时,我只注意到他们的client.id
我有点担心我是否启用了CooperativeStickyAssignor
导致使用不同分区分配策略的两个消费者之间的差异是什么?
在同一个kafka流应用程序中看到不同的消费者配置是正常的吗?
谢谢
您的问题中的第一个消费者日志是"恢复"的日志。管理状态存储恢复的消费者。你可以找到"恢复"这个词;在客户端id中。您在问题中显示的第二个消费者日志是您自己定义的消费者的日志。您的消费者使用的策略似乎是"streamspartionassignor"。