Kafka流:由于在还原过程中更改对数状态的变化而无法重新平衡



需要一些帮助,以找出我在Kafka流消费者之一中收到的例外。

我已经实现了具有低级处理器API的KAFKA流。对于我们从Kafka收到的每个更新,都将其合并并更新到KeyStore,以便保持状态。最初,我们只运行了一个消费者,一段时间后我们尝试提出第二个消费者。但是第二消费者在重新平衡期间抛出了一个例外,表明它没有重新平衡。发生这种情况是因为变更日志的状态发生了变化(下面的例外共享)。我认为,当重新平衡发生时,第一个消费者收到了一些更新,因此将更新推向相应的更改日志。请帮忙。也共享相同的示例代码。 我正在使用kafka 2_11 0.10.2.1,主题有72个分区

异常

    Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
    Caused by: java.lang.IllegalStateException: task [0_60] Log end offset of Kafka-xxxxxxxxxxxxxxxx-InfoStore-changelog-60 should not change while restoring: old end offset 80638, current offset 80640
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:252)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:56)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

代码段

    public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;
private int visitProcessorInstanceId;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
    infoStore = (KeyValueStore<Key, Info>) context.getStateStore("InfoStore");
}
@Override
public void process(Key key, Update update) {
    try {
        if (key != null && update != null) {
            Info info = infoStore.get(key);
            // merge logic
            infoStore.put(key, info);
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
    }
    context.commit();
}
@Override
public void punctuate(long timestamp) {
    try {
        KeyValueIterator<Key, Info> iter = this.infoStore.all();
        while (iter.hasNext()) {
            // processing logic
        }
        iter.close();
        context.commit();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

}

谢谢。

您的观察值正确。如果国家迁移造成的重新平衡需要很长时间,并且发生另一个重新平衡,则可能发生这种情况:

  1. 第一个实例正在运行
  2. 第二个实例开始,触发重新平衡
    • 第二个实例重新创建状态
  3. 另一个重新平衡发生(不确定如何在您的情况下触发)
    • 第二个实例仍在重新创建状态,并且不会重新加入该组(因此它退出了组)
  4. 第一个实例将状态迁移回去(它仍然具有状态的完整副本,因此没有重新创建 - 第二个实例还没有开始处理任何内容),然后恢复写入ChangElog主题
  5. 第二个实例死于例外

您可以验证吗?如果是,只要州娱乐正在运行,您就需要避免第二次重新平衡。

btw:此行为已经改善了trunk,并且将在即将发布的0.11.0.1版本中修复。您可以将KAFKA流应用程序更新为0.11.0.1,而无需升级经纪人。0.11.0.1应在接下来的几周内发布。

相关内容

最新更新