如何在Kafka Producer中启用onFailure回调



我正在写一个卡夫卡制作人,下面是我的课。

public class KafkaPublisher {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Autowired
KafkaProducer producer;
public void pushKafkaNotification(String partitionKey, String serializedKafkaNotification, String topic)
{
try {
producer.beginTransaction();
ProducerRecord<String, String> producerRecord = new ProducerRecord<String,String>(topic,partitionKey,serializedKafkaNotification);
Future<RecordMetadata> result = producer.send(producerRecord);
producer.commitTransaction();
}
catch (Exception e)
{
producer.abortTransaction();
log.error("Exception while pushing notification to kafka for user = {}", partitionKey,  e);
}
}
}

我如何在此启用onFailure回调,以便通过日志了解是否有故障?

替代解决方案,假设您不需要将参数记录到推送方法中。您应该从RecordMetadata中获得所需的所有信息。

producer.send(producerRecord, this);
public class KafkaPublisher implements Callback {
... 
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// ... 
} 
}

这就是你的做法。

producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null)
{
log.error("Callback : Failed to push event to kafka for partition key, notification {] {}", partitionKey, serializedKafkaNotification, e);
}
}
});

最新更新