Flink Kafka生产商的恰恰在一面语义上



i,m试图用kafka source和sink恰好测试flink的语义:

  1. 运行Flink应用程序,只需将消息从一个主题传输到另一个主题,并以平行性= 1,检查点间隔20秒
  2. 每2秒钟使用Python脚本生成带有整数数字的消息。
  3. 在read_comments隔离级别中使用控制台消费者读取输出主题。
  4. 手动杀死Taskmanager

我希望无论任务管理员杀人和恢复,都会看到输出主题中单调的整数增加。

,但实际上是一个在控制台 - 消费者输出中看到的意外事物:

32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

看起来像在输出主题中重播的检查点之间的所有消息一样。它应该是正确的行为还是我做错了什么?

恢复了一个快照:Flink UI

我的flink代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));
        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");
        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");
    }

除了设置精确语义的生产者之外,您还需要配置消费者以仅读取KAFKA的订单消息。默认情况下,消费者将读取订婚和未投入的消息。将此设置添加到您的消费者中应该使您更接近所需的行为。

consumerProperties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

flink在常规的可配置间隔上生成检查点。恢复检查点后,Flink将状态倒退到最后一次检查的输入流中的位置(不一定与上次处理/消耗相同(。有多种方法可以确保恰当的语义。您可以使用完全支持一开始语义的生产商(接收器(,请参见:flink sinks中的容错保证。

另外,您可以在消费者中完全支持一开始的语义。假设有多个工人(并行> 1(持续存在的独特整数,这是确保确切连接处理的一种方法:

  1. 假设当前检查点ID为ckpt n。在CKPT N的状态下存储所有处理的整数(在大事件的情况下,已处理事件的指纹(。您可以通过让消费者在ListCheckpointed接口上以存储存储来实现这一目标。ckptn。

  2. 中的状态(指纹或整数(
  3. 一旦Flink移动到Next检查点(CKPT n 1(,请滤除存储在CKPT N状态中的所有整数,以确保精确处理。将未过滤的处理整数(或已处理事件的指纹(存储在CKPT N 1的状态(即放弃CKPT N的状态(。

您只需要存储两个检查点之间发生的已处理事件的指纹(或整数(,然后在持续一个新检查点时丢弃。

相关内容

  • 没有找到相关文章

最新更新