如何匹配 Kafka 制作人的记录和记录元数据



我想在kafka生产者中跟踪记录的时间戳:记录何时由生产者发送,代理何时接收ack并存储记录(ack时间戳)。每条记录的头中都有字符串id。

对我来说,最好的方法是实现Kafka ProducerInterceptor:

@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}

onAcknowledgement我有RecordMetadata,可以得到ack时间戳,但我无法理解我如何能匹配哪个记录我接收元数据?从KIP-512中我可以看出,这是一个普遍的问题,正在讨论中。

如何核对onAcknowledgement中的记录?或者可能有更好的方法来跟踪记录的发送/接收时间戳?

不,我不能使用Producer callback,因为我不能改变Producer代码。

这个怎么样?

定义一个类型,可以保存一个生产者发送调用的所有相关数据:

class ProducerSendData<K, V>
{
ProducerRecord<K, V>   parameter;
Future<RecordMetadata> result;
RecordMetadata         acknowledgeMetaData;
Exception              acknowledgeException;
}

定义一个回调类型,当通过回调方法onCompletion(…)可用时,将ProducerSendData与确认数据耦合:

class ProducerSendDataCallback<K, V> implements Callback
{
// constructor injection ...
ProducerSendData<K, V> producerSendData;
/** updates {@link #producerSendData} with method acknowledgement metadata and exception */
@Override public void onCompletion(RecordMetadata metadata, Exception exception)
{
producerSendData.acknowledgeMetaData  = metadata;
producerSendData.acknowledgeException = exception;
}
}

然后调用kafka producer send方法并使用返回的future更新producerSendData:

ProducerSendData               producerSendData = new ProducerSendData(new ProducerRecord(whatever);
ProducerRecord                 methodParameter  = producerSendData.parameter;
ProducerSendDataCallback<K, V> methodCallback   = new ProducerSendDataCallback<>(producerSendDataItem);
Future<RecordMetadata>         methodResult     = producer.send(methodParameter, methodCallback);
producerSendData.methodResult = methodResult;

有了这个,你应该有所有相关的东西。ProducerSendData。结果未来值可能只在未来可用,当然;)

我希望这是可以理解的。干杯!

最新更新