现在我的 kafka 生产者正在将所有消息下沉到一个实际上具有多个分区的 kafka 主题的单个分区中。
如何创建一个生产者,该生产者将使用默认分区程序并在主题的不同分区之间分发消息。
我的 kafka 生产者的代码片段:
Properties props = new Properties();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap.servers);
props.put(ProducerConfig.ACKS_CONFIG, "all");
我正在使用 flink 卡夫卡制作器来接收有关 kafka 主题的消息。
speStream.addSink(
new FlinkKafkaProducer011(kafkaTopicName,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
props,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
使用默认分区程序,使用以下逻辑为消息分配分区:
键控消息:生成键的哈希值,并基于该哈希值选择分区。这意味着具有相同键的消息最终将位于同一分区上
无键消息:轮循机制用于分配分区
解释您看到的行为的一个选项是,如果您对所有消息使用相同的键,那么使用默认分区程序,它们最终将位于同一分区上。
通过将 flinkproducer 更改为
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema((, props((;