在 Apache Flink 中是否有相当于 Kafka 的 KTable?



Apache Kafka有一个KTable的概念,其中

其中每个数据记录代表一个更新

本质上,我可以使用kafka主题,并且只保留每个密钥的最新消息。

Apache Flink中有类似的概念吗?我读过Flink的Table API,但似乎并没有解决同样的问题。

比较和对比这两个框架会有所帮助。我不是在寻找更好还是更坏。而是它们之间的区别。答案是正确的将取决于我的要求。

你说得对。Flink的Table API及其Table类与Kafka的KTable不对应。表API是一种嵌入API的关系语言(想想Java和Scala中集成的SQL)。

Flink的DataStream API没有对应于KTable的内置概念。相反,Flink提供了复杂的状态管理,KTable将是具有键控状态的常规操作员。

例如,具有两个输入的有状态运算符存储从第一个输入观察到的最新值,并将其与来自第二个输入的值连接,可以用CoFlatMapFunction实现,如下所示:

DataStream<Tuple2<Long, String>> first = ...
DataStream<Tuple2<Long, String>> second = ...
DataStream<Tuple2<String, String>> result = first
// connect first and second stream
.connect(second)
// key both streams on the first (Long) attribute
.keyBy(0, 0)
// join them
.flatMap(new TableLookup());
// ------
public static class TableLookup 
extends RichCoFlatMapFunction<Tuple2<Long,String>, Tuple2<Long,String>, Tuple2<String,String>> {
// keyed state
private ValueState<String> lastVal;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<String> valueDesc = 
new ValueStateDescriptor<String>("table", Types.STRING);
lastVal = getRuntimeContext().getState(valueDesc);
}
@Override
public void flatMap1(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// update the value for the current Long key with the String value.
lastVal.update(value.f1);
}
@Override
public void flatMap2(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// look up latest String for current Long key.
String lookup = lastVal.value();
// emit current String and looked-up String
out.collect(Tuple2.of(value.f1, lookup));
}
}

一般来说,state可以非常灵活地与Flink一起使用,让我们实现广泛的用例。还有更多的状态类型,如ListStateMapState,对于ProcessFunction,您可以对时间进行细粒度控制,例如,如果某个键在一定时间内没有更新,则可以删除该键的状态(据我所知,KTables有相应的配置)。

相关内容

  • 没有找到相关文章

最新更新