卡夫卡 - 制片人 - 手柄"failed to send"



我正在运行一个0.8 Kafka,并使用提供的Java API构建一个生产者
发送消息的API函数返回void。

有没有办法获取已发送消息的状态?是发送还是失败?

这对我们来说非常重要,因为我们正在从文件中读取消息,并且我们希望在发送完所有消息后删除该文件。但是,如果出现错误,一些消息没有发送,我删除了文件,这将导致一个非常重要的数据丢失。

您可以将生产者配置为等待它从Kafka集群(request.request.acks)获得n个acks,这样您就可以在删除源文件之前确保数据已正确提交。

如果真的你需要确保消息发送成功,你可能需要考虑让生产者同步(生产者.类型=同步)的替代方案。这样,你就可以捕捉到阻塞调用引发的任何异常,并采取相应的行动。send()引发的异常是kafka.commun.FailedToSendMessageException。

Kafka的Java API并不理想,希望对您有所帮助。

相关内容

最新更新