使用KAFKA版本:2.0.1&kafka-streams-scala版本2.0.1
日志调试消息,例如:
调试2019-05-08 09:57:53,322 [he.kafka.clients.networkclient] [
] [ ]: [消费者 clientId = xxx-bd6b071d-a44f-4253-a3a5-539d60a72dddd3-streamthread-1-消费者, groupId = xxx]由于要求而与节点yyy断开连接 超时。"
导致我增加了请求。Timeout.ms值:
private val config: Properties = new Properties
config.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, "240000")
...
private val streams: KafkaStreams = new KafkaStreams(topology, config)
但是,这将新值设置为AdminClientConfig
和ConsumerConfig
的240000ms(default request.time.timeout.ms值AdminClientConfig
和ConsumerConfig
实际上是不同的-120000ms和40000ms(。
是否有任何方法可以为AdminClientConfig
或ConsumerConfig
设置Kafka流配置值,而无需覆盖两个?
您可以将任何配置与 consumer.
或 admin.
的前缀仅应用于一个客户端。
还有main.consumer.
,restore.consumer.
和global.consumer.
,可以进一步区分不同的消费者。使用consumer.
作为前缀,将配置应用于所有消费者。
最后,还有producer.
前缀(只提到完整性(。
比较文档:https://docs.confluent.io/current/current/streams/developer-guide/config-streams.html#naming