在本地执行模式下停止/启动 Kafka 使用者/生产者流



Setup:

  • 爪哇 8
  • Flink 1.2 (Mac OSX(
  • Kafka 0.10.0 (VirtualBox/Ubuntu(
  • FlinkKafka消费者010
  • Flink卡夫卡制作人010

创建了一个简单的示例程序,用于从一个 Kafka 主题消费 1M 条消息并生成到另一个 - 在本地执行模式下运行。 这两个主题都有 32 个分区。

当我让从头到尾运行时,它会消耗并生成所有消息。 如果我在完成之前启动然后停止(SIGINT(,然后再次重新启动,则生产者仅接收原始1M消息的子集。

我已经确认了消费者的偏移量,它读取了所有 1M 条消息。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(32);
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);

--

producer.setFlushOnCheckpoint(true);
producer.setLogFailuresOnly(false);

在本地执行模式下,这是预期的吗? 是否需要启用保存点才能停止和重新启动流作业? 发生这种情况时,我似乎生产者没有提交所有消息。

提前感谢!

首先,在后续运行中,它只接收消息的子集,因为FlinkKafkaConsumer使用 Kafka 中提交的偏移量作为起始位置。目前,在发行版中1.2.0避免这种情况的唯一方法是始终分配一个新的group.id .在下一个版本中,将有新的选项:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration。

作为旁注,还请注意,Kafka 中提交的偏移量根本不用于 Flink 中的精确一次处理保证。Flink 只依赖于检查点偏移量。有关这方面的更多详细信息可以在上面链接中的 Flink Kafka 连接器文档中找到。

相关内容

  • 没有找到相关文章

最新更新