apache flink -如何在生产到kafka sink后获得确认,即使用FlinkKafkaProducer09


stream.addSink(new FlinkKafkaProducer09<String>("Topic-name", new SimpleStringSchema(), 
properties));

是否有任何回调或完成FlinkkafkaProducer?

你可以试着给生产者发个消息。

一旦你发送了消息,你可以在它上面写一个回调。虽然这是一个昂贵的操作,你应该尽量避免它。

send()方法是异步的。调用时,它将记录添加到等待记录的缓冲区发送并立即返回。这允许为了提高效率,生产者可以将单个记录批处理在一起。

你可以看看它的用法:

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
           new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                   if(e != null)
                       e.printStackTrace();
                   System.out.println("The offset of the record we just sent is: " + metadata.offset());
               }
           });

详细信息请参考文档

相关内容

  • 没有找到相关文章

最新更新