Kafka生产者故障转移机制和推送到主题的数据验证



我已经写了一个代码,每天将数据推送到kafka主题,但有一些问题我不确定这个代码是否能够处理。我的职责是从保存1天数据的实时表中推送完整的数据(每天早上刷新(

我的代码将查询"select*fromytable",并将其逐个推送到kafka主题,因为在推送之前,我需要验证/更改每一行并推送到主题。

下面是我的生产者发送代码

Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("acks", "all");
configProperties.put("retries", 0);
configProperties.put("batch.size", 15000);
configProperties.put("linger.ms", 1);
configProperties.put("buffer.memory", 30000000);
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
System.out.println("Starting Kafka producer job  " + new Date());
producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});

现在,我不知道在失败的情况下如何将数据再次推回到主题中。由于我已经从表中选择了所有的记录,其中很少有失败的,我不知道是哪一个。

以下是我想要解决的问题

  1. 如何只处理那些未被推送的记录,以避免重复记录被推送(避免冗余(。

  2. 如何验证推送的记录与表中的完全相同。我指的是数据的完整性。比如推送的数据大小和记录数。

您可以使用configProperties.put("enable.idempotence", true);-它将尝试重试失败的消息,但确保每个记录中只有一条保存在kafka中。注意,这意味着retries>0acks=allmax.in.flight.requests.per.connection>=0。查看详细信息https://kafka.apache.org/documentation/.

对于第二个问题-如果你的意思是你需要保存所有记录或不保存,那么你必须使用kafka交易,这会带来更多的问题,我建议阅读https://www.confluent.io/blog/transactions-apache-kafka/

最新更新