当从一个主题中加载globalktable在不同主题中使用相同的键时会发生什么



我们有一个带有一个分区的压实主题,我们在其中添加了一个新分区。

我们没有重新分配现有数据 - 这意味着在添加新分区之前加载的事件仍在分区0中。正如预期的那样,根据标准政策存储新事件:所有带有相同键的事件都进入同一分区。

我们目前处于这样的案例中:

Partition    Offset    Timestamp      Key         Value
0            586       1545388284240  COD_ISIN    AAA
1            983       1551800369978  COD_ISIN    BBB
1            1141      1556526044144  COD_ISIN    CCC

当我将该主题加载到GlobalKTable中时,商店中的值为AAA。我们显然希望将CCC作为当前值。

GlobalKTable<String, JsonNode> storeDatacatalog = builder.globalTable(TOPIC, consumed,  Materialized.as(STORE_DATACATALOG));
KStream<String, JsonNode> inEvent = builder.stream(OTHER_TOPIC, consumed);
inEvent = inEvent.transform(
    new TransformerSupplier<String, JsonNode, KeyValue<String, JsonNode>>() {
        @Override
        public Transformer<String, JsonNode, KeyValue<String, JsonNode>> get() {
            return new Transformer<String, JsonNode, KeyValue<String, JsonNode>>() {
                private ProcessorContext context;
                private KeyValueStore<String, JsonNode> dataCatalogueState;
                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                    this.dataCatalogueState = (KeyValueStore<String, JsonNode>) context.getStateStore(STORE_DATACATALOG);
                    LOGGER.debug("Content of dataCatalogueState: ");
                    KeyValueIterator<String, JsonNode> allDc = this.dataCatalogueState.all();
                    JsonNode valueForIsin = null;
                    while (allDc.hasNext()) {
                        try {
                            KeyValue<String, JsonNode> next = allDc.next();
                            LOGGER.debug(" | " + next.key + " : " + next.value);
                            if ("COD_ISIN".equals(next.key)) 
                                valueForIsin = next.value;
                        } catch (Exception e) {
                            LOGGER.debug("exc" , e.getMessage());
                        }
                    }
                    LOGGER.info(" COD_ISIN ---> " + valueForIsin);
                }
                @Override
                public void close() {
                }
                @Override
                public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
                    return new KeyValue<>(key, value);
                }
                //@Override
                public KeyValue<String, JsonNode> punctuate(long timestamp) {
                    // TODO Auto-generated method stub
                    return null;
                }
            };
        }
    }
)

GlobalKTable如何构建其状态?是基于Offset还是Timestamp它是否在内部粘贴找到钥匙的第一个分区的钥匙?

我知道如何解决问题(清除主题并再次填充 - 将应用分区策略(。但是我对它的内部工作方式感到好奇。

GlobalKTable假设数据通过键分区。因此,如果您在不同分区中具有相同键的记录,则不能保证应用订单记录。每个分区仅保证订单。除此之外,ATM,更新仅基于分区中的偏移。

使用上面的示例,订单可以为

  • AAA,BBB,CCC
  • BBB,AAA,CCC
  • BBB,CCC,AAA

只有保证,将在CCC之前应用BBB。

相关内容

  • 没有找到相关文章

最新更新