我正在尝试使用Apache Flink 1.15.1从两个kafka主题读取消息,转换它们并最终将它们发送到另一个kafka主题。添加了一些保存点,以便在取消和重新启动时保存应用程序的状态。问题是我们在重新启动后在主题中获得重复的消息。例如,如果我们在作业运行后在topic中有10条消息,我们再次重启作业,那么我们在topic中看到20条消息。请帮我一下。
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString("restart-strategy", "fixed-delay");
configuration.setString("restart-strategy.fixed-delay.attempts", "3");
configuration.setString("restart-strategy.fixed-delay.delay", "30s");
configuration.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
configuration.setString("execution.checkpointing.interval", "3min");
configuration.setString("execution.checkpointing.externalized-checkpoint-retention", "DELETE_ON_CANCELLATION");
configuration.setString("state.backend", "rocksdb");
configuration.setString("state.checkpoints.dir", "file:///tmp/checkpoints/");
String statement = "CREATE TABLE test1 (rn" +
" id STRING,rn" +
" sameAs STRING,rn" +
" PRIMARY KEY (id) NOT ENFORCEDrn" +
") WITH (rn" +
" 'connector' = 'upsert-kafka',rn" +
" 'topic' = 'source4',rn" +
" 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',rn" +
" 'key.format' = 'raw',rn" +
" 'value.format' = 'avro-confluent',rn" +
" 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081'rn" +
")";
tEnv.executeSql(statement);
String relStatement = "CREATE TABLE test2 (rn" +
" id STRING,rn" +
" correlationId STRING,rn" +
" ),rn" +
" PRIMARY KEY (correlationId) NOT ENFORCEDrn" +
") WITH (rn" +
" 'connector' = 'upsert-kafka',rn" +
" 'topic' = 'relationship4',rn" +
" 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',rn" +
" 'key.format' = 'raw',rn" +
" 'value.format' = 'avro-confluent',rn" +
" 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081'rn" +
")";
tEnv.executeSql(relStatement);
/*sink1*/
tEnv.executeSql("CREATE TABLE test3 (n" +
" id STRING,rn" +
" roleId STRING,rn" +
" PRIMARY KEY (id) NOT ENFORCEDrn" +
") WITH (n" +
" 'connector' = 'upsert-kafka',n" +
" 'topic' = 'test4',rn" +
" 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',rn" +
" 'properties.allow.auto.create.topics' = 'true',rn" +
" 'key.format' = 'raw',rn" +
" 'value.format' = 'json'rn" +
")");
String joinStat = "INSERT INTO test3 select............. "
tEnv.executeSql(joinStat);
我没有看到Kafka设置一次保证的任何配置,所以你将获得至少一次保证https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/#consistency-guarantees
在同一页面上,你还可以发现,如果你想实现一次保证,你需要添加sink.delivery-guarantee
和sink.transactional-id-prefix
。