我如何知道Kafka Topic中特定作业的所有消息都被S3 Sink Connector消耗了? &



我正在尝试使用S3 Sink Connector从Kafka Topic中消费消息。

当特定数量的记录被处理并写入S3对象时,我想关闭这个连接器并删除Kafka Topic。

需要消耗的记录数可以从一个ksqlDB表中查询。

在Kafka Connect框架中没有可用的数字触发器来执行此操作。

你可以写一个Lambda来读取写到S3的每个文件,因为Connect写它们,计算每个文件的记录数量(并将值持久化在DynamoDB或Elasticache中),然后当你达到你的限制时,你可以发送一个HTTP调用来删除连接器,和kafkaAdminClientdelete-topics动作。

ksqlDB有自己的消费者和计数器,并且不从S3读取数据来知道实际写了多少事件。

相关内容

  • 没有找到相关文章