我有一个非常简单的设置来向Kafka:发送消息
var producerConfig = new ProducerConfig
{
BootstrapServers = "www.example.com",
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.ScramSha512,
SaslUsername = _options.SaslUsername,
SaslPassword = _options.SaslPassword,
MessageTimeoutMs = 1
};
var producerBuilder = new ProducerBuilder<Null, string>(producerConfig);
using var producer = producerBuilder.Build();
producer.Produce("Some Topic", new Message<Null, string>()
{
Timestamp = Timestamp.Default,
Value = "hello"
});
以前,此代码运行良好。今天它决定停止工作,我正在努力找出原因。我试图让制作人在未能传递消息时抛出异常,但它似乎从未崩溃。即使我输入了错误的用户名和密码,生产商仍然不会崩溃。在我的本地输出窗口中甚至没有一条日志线。当制作人从未显示任何问题时,我如何调试我的Kafka连接?
Produce是异步的,不是阻塞的,函数签名是
void Produce(string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
为了验证发送的消息是否无错误
您可以添加交付报告处理程序功能,例如
private void DeliveryReportHandler(DeliveryReport<int, T> deliveryReport)
{
if (deliveryReport.Status == PersistenceStatus.NotPersisted)
{
_logger.LogError($"Failed message delivery: error reason:{deliveryReport.Error?.Reason}");
_messageWasNotDelivered = true;
}
}
_messageWasNotDelivered = false;
_producer.Produce(topic,
new Message<int, T>
{
Key = key,
Value = entity
},
DeliveryReportHandler)
_producer.Flush(); // Wait until all outstanding produce requests and delivery report callbacks are completed
if(_messageWasNotDelivered ){
// handle non delivery
}
这个代码可以像这个一样对批量生产进行琐碎的调整
_messageWasNotDelivered = false;
foreach(var entity in entities){
_producer.Produce(topic,
new Message<int, T>
{
Key = entity.Id,
Value = entity
},
DeliveryReportHandler)
}
_producer.Flush(); // Wait until all outstanding produce requests and delivery report callbacks are completed
if(_messageWasNotDelivered ){
// handle non delivery
}
您可以将SetErrorHandler()
添加到ProducerBuilder中。它看起来是这样的:
var producerBuilder = new ProducerBuilder<Null, string>(producerConfig)
.SetErrorHandler(errorMessageString => .....);
在该lambda中设置断点,就可以在出现错误时中断。