复制消息Flink卡夫卡水槽从源卡夫卡CDC使用表连接器



我正在尝试使用Apache Flink 1.15.1从两个kafka主题读取消息,转换它们并最终将它们发送到另一个kafka主题。添加了一些保存点,以便在取消和重新启动时保存应用程序的状态。问题是我们在重新启动后在主题中获得重复的消息。例如,如果我们在作业运行后在topic中有10条消息,我们再次重启作业,那么我们在topic中看到20条消息。请帮我一下。

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);          
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString("restart-strategy", "fixed-delay");
configuration.setString("restart-strategy.fixed-delay.attempts", "3");
configuration.setString("restart-strategy.fixed-delay.delay", "30s");
configuration.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
configuration.setString("execution.checkpointing.interval", "3min");
configuration.setString("execution.checkpointing.externalized-checkpoint-retention", "DELETE_ON_CANCELLATION");
configuration.setString("state.backend", "rocksdb");
configuration.setString("state.checkpoints.dir", "file:///tmp/checkpoints/");

String statement = "CREATE TABLE test1 (rn" + 
"  id STRING,rn" + 
"  sameAs STRING,rn" + 
"  PRIMARY KEY (id) NOT ENFORCEDrn" + 
") WITH (rn" + 
"  'connector' = 'upsert-kafka',rn" + 
"  'topic' = 'source4',rn" + 
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',rn" + 
"  'key.format' = 'raw',rn" + 
"  'value.format' = 'avro-confluent',rn" + 
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081'rn" + 
")";

tEnv.executeSql(statement);

String relStatement = "CREATE TABLE test2 (rn" + 
"  id STRING,rn" + 
"  correlationId STRING,rn" + 
"  ),rn" + 
"  PRIMARY KEY (correlationId) NOT ENFORCEDrn" + 
") WITH (rn" + 
"  'connector' = 'upsert-kafka',rn" + 
"  'topic' = 'relationship4',rn" + 
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',rn" + 
"  'key.format' = 'raw',rn" + 
"  'value.format' = 'avro-confluent',rn" + 
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081'rn" + 
")";
tEnv.executeSql(relStatement);
/*sink1*/
tEnv.executeSql("CREATE TABLE test3 (n" +
"  id STRING,rn" + 
"  roleId STRING,rn" + 
"  PRIMARY KEY (id) NOT ENFORCEDrn" + 
") WITH (n" +
"  'connector'  = 'upsert-kafka',n" +
"  'topic' = 'test4',rn" + 
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',rn" + 
"  'properties.allow.auto.create.topics' = 'true',rn" + 
"  'key.format' = 'raw',rn" + 
"  'value.format' = 'json'rn" + 
")");       
String joinStat = "INSERT INTO test3 select............. "          
tEnv.executeSql(joinStat);

我没有看到Kafka设置一次保证的任何配置,所以你将获得至少一次保证https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/#consistency-guarantees

在同一页面上,你还可以发现,如果你想实现一次保证,你需要添加sink.delivery-guaranteesink.transactional-id-prefix

相关内容

  • 没有找到相关文章

最新更新