我的kafka接收器连接器读取了多个主题(配备了10个任务(,并从所有主题中处理了300多个记录。根据每个记录中保存的信息,连接器可以执行某些操作。
以下是键:触发记录中的值对的示例:
"REPROCESS":"my-topic-1"
阅读此记录后,我需要将主题" My-Topic-1"的偏移重置为每个分区中的0。
我已经阅读了许多地方创建新的KafkaConsumer
,订阅了主题分区,然后调用subscribe(...)
方法是推荐的方法。例如,
public class MyTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
records.forEach(record -> {
if (record.key().toString().equals("REPROCESS")) {
reprocessTopicRecords(record);
} else {
// do something else
}
});
}
private void reprocessTopicRecords(SinkRecord record) {
KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer =
new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do offset reset here
}
}
);
}
}
但是,以上策略对我的案例不起作用,因为:1.这取决于发生的团体重新平衡(并非总是发生(2.传递给onPartitionsAssigned
方法的"分区"是动态分配的分区,这意味着这些仅是一组完整分区的子集,需要将其偏移重置。例如,此sinktask将仅分配8个保存" My-Topic-1"记录的8个分区中的2个。
我还考虑使用assign()
,但这与SinkConnector/SinkTask实现中的分布式消费者模型(消费者组(不兼容。
我知道KAFKA命令行工具kafka-consumer-groups
可以做我想要的(我认为(:https://gist.github.com/marwei/cd40657c481f94ebe273ecc166601674b
总而言之,我想使用Java API重置给定主题的所有分区的偏移,然后让接收器连接器拾取偏移更改并继续执行它一直在做的事情(处理记录(。
预先感谢。
我能够通过使用一系列Confluent的Kafka-rest-Proxy API来实现Kafka Connect消费者组的重置偏移:https://docs.confluent.io/current/current/kafka-rest/api.html
此实现不再需要原始帖子中描述的"触发记录"方法FIR,并且纯粹基于REST API。
-
暂时删除kafka连接器(这将删除连接器的消费者和(
-
为同一消费者组创建一个消费者实例("连接 - "(
-
让实例订阅您要重置的请求的主题
-
做一个虚拟的民意调查('subscribe'被评估了'(
-
重置指定主题的消费者组主题偏移
-
做一个虚拟的民意调查(懒惰的'seky'(对消费者的当前偏移状态(在代理中( 重新创建Kafka连接器(具有相同的连接器名称( - 重新平衡后,消费者将加入该组并阅读最后的偏移量(从0开始(
删除临时消费者实例
如果您能够使用CLI,则步骤2-6可以替换为:
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
对于那些试图通过本机Java API在Kafka连接器代码中执行此操作的人,您不幸: - (
您正在寻找Seek方法。要么偏移
consumer.seek(new TopicPartition("topic-name", partition), offset);
或seekToBeginning
但是,我觉得您会与Connect Api API的消费者组竞争。换句话说,假设您使用单独的组ID设置消费者,那么您本质上是从源主题中两次消耗记录,一次是通过连接,然后是您自己的消费者实例。
除非您也明确寻求连接自己的消费者实例(未暴露(,否则您将进入一个怪异的状态。例如,尽管您自己的消费者会在寻找旧的偏移量,否则您的任务仅在新记录上执行,否则您仍将在处理旧事件时进行旧事件。
此外,最终,由于保留政策,您可能会在主题的一开始就获得一个重新赛事活动,例如,旧记录到期,使您的消费者根本不进步,并通过寻求一开始就不断地重新平衡其小组。
我们必须进行非常相似的偏移重置练习。
KafkaConsumer.seek()
与KafkaConsumer.commitSync()
合并良好。
如果您要处理很多主题和分区(Javadoc(,还有其他值得一提的选择。
AdminClient.alterConsumerGroupOffsets(
String groupId,
Map<TopicPartition,OffsetAndMetadata> offsets
)
我们很幸运,因为我们有一段时间停止Kafka Connect实例的奢侈品,因此没有消费者组竞争。