Flink kafka - Flink 作业不向不同的分区发送消息



>我有以下配置:

  1. 一个具有 2 个分区的 kafka 主题
  2. 一个动物园管理员实例
  3. 一个卡夫卡实例
  4. 具有相同组 ID 的两个使用者

Flink 作业片段:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new 
SimpleStringSchema(), props));

场景 1:

我在 eclipse 上写了一个 flink 作业(生产者(,它正在从文件夹中读取文件并将 msgs 放在 kafka 主题上。

因此,当我使用 eclipse 运行此代码时,它工作正常。

例如:如果我放置一个包含 100 条记录的文件,flink 向分区 1 发送少量消息,向分区 2 发送少量消息,因此两个消费者都得到很少的消息。

场景 2:当我创建上述代码的 jar 并在 flink 服务器上运行时,flink 将所有消息发送到单个分区,因此只有一个消费者获取所有消息。

我希望使用在场景 1 中创建的 jar 2。

对于 Flink-Kafka Producers,添加 "null" 作为最后一个参数。

speStream.addSink(new FlinkKafkaProducer011(
kafkaTopicName,
new SimpleStringSchema(),
props,
(FlinkKafkaPartitioner) null)
);

对此的简短解释是,这会关闭 Flink 使用默认分区程序FlinkFixedPartitioner。将其作为默认值关闭将允许 Kafka 在其分区之间分配它认为合适的数据。如果未关闭此功能,则用于使用 FlinkKafkaProducer 的接收器的每个并行度/任务槽将只写入每个并行度/任务槽的一个分区。

如果你不提供FlinkKafkaPartitioner或者没有明确表示要使用 Kafka 的FlinkFixedPartitioner,这意味着来自一个任务的所有事件最终将位于同一个分区中。

要使用 Kafka 的分区程序,请使用以下 ctor:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), props), Optional.empty());

从 IDE 和 eclipse 运行之间的差异可能是因为 Flink 中的并行性或分区设置不同。

相关内容

  • 没有找到相关文章

最新更新