如何从 KTable 获取排序的 KeyValueStore



我想从KStream实现一个KTable,我希望KeyValueStore按Key排序。

我尝试查找 KTable API 规范 (https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KTable.html(,但不存在"排序"方法。我还查找了这篇文章(https://dzone.com/articles/how-to-order-streamed-dataframes(,建议通过处理器 API 实现排序。但是,我正在检查是否可以以其他方式实现?

KafkaStream 允许您具体化可查询的状态存储。然后,您可以通过调用方法 kafkaStream#store() 来获取对存储的只读访问权限。

如果你定义了持久存储,KafkaStreams 将使用 RocksDB 来存储你的数据。返回的 KeyValueIterator 实例将使用 RocksDB 迭代器,该迭代器允许您以排序方式迭代键值 Rocks 迭代器实现。

例:

    KafkaStreams streams = new KafkaStreams(topology, props);
    ReadOnlyKeyValueStore<Object, Object> store = streams.store("storeName", QueryableStoreTypes.keyValueStore());
    KeyValueIterator<Object, Object> iterator = store.all();

使用密钥将事件添加到状态存储中。状态存储返回的 KeyValueIterator 以有序方式导航 KeyValue。

public class SortProcessor extends AbstractProcessor<String, Event> {
    private static Logger LOG = LoggerFactory.getLogger(SortProcessor.class);
    private final String stateStore;
    private final Long bufferIntervalInSeconds;
    // Why not use a simple Java NavigableMap? Check out my answer at : https://stackoverflow.com/a/62677079/2256618
    private KeyValueStore<String, Event> keyValueStore;
    public SortProcessor(String stateStore, Long bufferIntervalInSeconds) {
        this.stateStore = stateStore;
        this.bufferIntervalInSeconds = bufferIntervalInSeconds;
    }
    @Override
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        keyValueStore = (KeyValueStore) context().getStateStore(stateStore);
        context().schedule(Duration.ofSeconds(bufferIntervalInSeconds), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
    }
    void punctuate(long timestamp) {
        LOG.info("Punctuator invoked...");
        try (KeyValueIterator<String, Event> iterator = keyValueStore.all()) {
            while (iterator.hasNext()) {
                KeyValue<String, Event> next = iterator.next();
                if (next.value == null) {
                    continue;
                }
                LOG.info("Sending {}", next.key);
                context().forward(null, next.value);
                keyValueStore.delete(next.key);
            }
        }
    }
    @Override
    public void process(String key, Event value) {
        Event event = Event.builder(value).payload(value.getPayload().toUpperCase()).build();
        keyValueStore.put(event.getEventType().name() + " " + event.getId(), event);
    }
    public static String getName() {
        return "sort-processor";
    }
}

可执行代码在这里。我在这里使用了一个简单的内存中状态存储。如果您预计短时间内会出现大量事件,则可以按照其他答案中已经建议的那样使用持久状态存储。

相关内容

  • 没有找到相关文章

最新更新