我正在尝试使用S3 Sink Connector从Kafka Topic中消费消息。
当特定数量的记录被处理并写入S3对象时,我想关闭这个连接器并删除Kafka Topic。
需要消耗的记录数可以从一个ksqlDB表中查询。
在Kafka Connect框架中没有可用的数字触发器来执行此操作。
你可以写一个Lambda来读取写到S3的每个文件,因为Connect写它们,计算每个文件的记录数量(并将值持久化在DynamoDB或Elasticache中),然后当你达到你的限制时,你可以发送一个HTTP调用来删除连接器,和kafkaAdminClient
delete-topics动作。
ksqlDB有自己的消费者和计数器,并且不从S3读取数据来知道实际写了多少事件。