我想在kafka生产者中跟踪记录的时间戳:记录何时由生产者发送,代理何时接收ack并存储记录(ack时间戳)。每条记录的头中都有字符串id。
对我来说,最好的方法是实现Kafka ProducerInterceptor:
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
在onAcknowledgement
我有RecordMetadata,可以得到ack时间戳,但我无法理解我如何能匹配哪个记录我接收元数据?从KIP-512中我可以看出,这是一个普遍的问题,正在讨论中。
如何核对onAcknowledgement中的记录?或者可能有更好的方法来跟踪记录的发送/接收时间戳?
不,我不能使用Producer callback,因为我不能改变Producer代码。
这个怎么样?
定义一个类型,可以保存一个生产者发送调用的所有相关数据:
class ProducerSendData<K, V>
{
ProducerRecord<K, V> parameter;
Future<RecordMetadata> result;
RecordMetadata acknowledgeMetaData;
Exception acknowledgeException;
}
定义一个回调类型,当通过回调方法onCompletion(…)可用时,将ProducerSendData与确认数据耦合:
class ProducerSendDataCallback<K, V> implements Callback
{
// constructor injection ...
ProducerSendData<K, V> producerSendData;
/** updates {@link #producerSendData} with method acknowledgement metadata and exception */
@Override public void onCompletion(RecordMetadata metadata, Exception exception)
{
producerSendData.acknowledgeMetaData = metadata;
producerSendData.acknowledgeException = exception;
}
}
然后调用kafka producer send方法并使用返回的future更新producerSendData:
ProducerSendData producerSendData = new ProducerSendData(new ProducerRecord(whatever);
ProducerRecord methodParameter = producerSendData.parameter;
ProducerSendDataCallback<K, V> methodCallback = new ProducerSendDataCallback<>(producerSendDataItem);
Future<RecordMetadata> methodResult = producer.send(methodParameter, methodCallback);
producerSendData.methodResult = methodResult;
有了这个,你应该有所有相关的东西。ProducerSendData。结果未来值可能只在未来可用,当然;)
我希望这是可以理解的。干杯!