stream.addSink(new FlinkKafkaProducer09<String>("Topic-name", new SimpleStringSchema(),
properties));
是否有任何回调或完成FlinkkafkaProducer?
你可以试着给生产者发个消息。
一旦你发送了消息,你可以在它上面写一个回调。虽然这是一个昂贵的操作,你应该尽量避免它。
send()方法是异步的。调用时,它将记录添加到等待记录的缓冲区发送并立即返回。这允许为了提高效率,生产者可以将单个记录批处理在一起。
你可以看看它的用法:
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
详细信息请参考文档