>我有以下配置:
- 一个具有 2 个分区的 kafka 主题
- 一个动物园管理员实例
- 一个卡夫卡实例
- 具有相同组 ID 的两个使用者
Flink 作业片段:
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new
SimpleStringSchema(), props));
场景 1:
我在 eclipse 上写了一个 flink 作业(生产者(,它正在从文件夹中读取文件并将 msgs 放在 kafka 主题上。
因此,当我使用 eclipse 运行此代码时,它工作正常。
例如:如果我放置一个包含 100 条记录的文件,flink 向分区 1 发送少量消息,向分区 2 发送少量消息,因此两个消费者都得到很少的消息。
场景 2:当我创建上述代码的 jar 并在 flink 服务器上运行时,flink 将所有消息发送到单个分区,因此只有一个消费者获取所有消息。
我希望使用在场景 1 中创建的 jar 2。
对于 Flink-Kafka Producers,添加 "null" 作为最后一个参数。
speStream.addSink(new FlinkKafkaProducer011(
kafkaTopicName,
new SimpleStringSchema(),
props,
(FlinkKafkaPartitioner) null)
);
对此的简短解释是,这会关闭 Flink 使用默认分区程序FlinkFixedPartitioner
。将其作为默认值关闭将允许 Kafka 在其分区之间分配它认为合适的数据。如果未关闭此功能,则用于使用 FlinkKafkaProducer 的接收器的每个并行度/任务槽将只写入每个并行度/任务槽的一个分区。
如果你不提供FlinkKafkaPartitioner
或者没有明确表示要使用 Kafka 的FlinkFixedPartitioner
,这意味着来自一个任务的所有事件最终将位于同一个分区中。
要使用 Kafka 的分区程序,请使用以下 ctor:
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), props), Optional.empty());
从 IDE 和 eclipse 运行之间的差异可能是因为 Flink 中的并行性或分区设置不同。