扩展Kafka Connect SinkTask并从给定的偏移量开始消费



我想扩展SinkTask以创建我自己的接收器连接器。

如果我在刷新过程中保存偏移,并且下次启动接收器连接器时,我想继续读取保存的偏移,那么正确的方法是什么?

我尝试使用被覆盖的initialize(SinkTaskContext context)SinkTaskContext来分配我自己的偏移:

@Override
public void initialize(SinkTaskContext context) {
  HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
  ...
  context.offset(offsetMap);
}

但这不起作用,因为分区还没有分配。我得到了一个例外。

然后,我是否应该将上下文(来自initialize())保存到全局变量中,然后使用它在方法open(Collection<TopicPartition> partitions)(从SinkTask重写)中为其分配偏移量,就像我在initialize中所做的那样?例如:

@Override
public void open(Collection<TopicPartition> partitions) {
  HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>();
  for (TopicPartition tp : partitions) // for each partition assigned
  {
     Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition());
     if (offset == null) { offset = 0l; } // 0 Long
     offsetMapNew.put(tp, offset);
  }
  mySavedTaskContext.offset(offsetMapNew); // sync offsets ?
}

open()期间重置偏移量应该是正确的方法,但由于一个尚未解决的错误,目前无法正确处理。

目前的解决方法是处理put()中的重置偏移。这可能有点违反直觉,但由于您正在管理自己的偏移,如果您愿意,实际上可以忽略数据。当您得到第一个put()调用时,您可以处理加载偏移并重置它们。所有后续数据都将来自重置时指定的偏移量。这就是HDFS连接器当前实现一次交付的方式。(不幸的是,这是一个很好的例子,说明了如何只获得一次,但相对复杂的代码。)事实上,由于HDFS连接器是Kafka Connect中偏移管理功能的驱动因素,因此它在重新平衡时不进行重置的事实正是实现中错过的原因。

相关内容

最新更新