使用 kafka 模板批处理记录



我有一个java服务器端应用程序,它侦听tcp连接,读取消息,处理并发布到kafka主题。我们使用带有 kafka tm 的事务性 kafka 模板来实现一次语义。当我们逐条记录提交时,这工作正常。我们希望基于事件(检查消息是否具有某个参数集,然后提交(或基于时间间隔提交一批记录。这是卡夫卡模板可能实现的吗?我们使用弹簧卡夫卡 2.1

一种解决方案:

您可以在不同的线程上运行KafkaTemplate.executeInTransaction(),并通过BlockingQueue或类似方式移交来自 TCP 侦听器的请求,队列在回调范围内poll()(可能超时(。

然后,当正确的条件适用时;退出回调以提交事务,然后启动另一个事务。

最新更新