Apache Kafka有一个KTable的概念,其中
其中每个数据记录代表一个更新
本质上,我可以使用kafka主题,并且只保留每个密钥的最新消息。
Apache Flink中有类似的概念吗?我读过Flink的Table API,但似乎并没有解决同样的问题。
比较和对比这两个框架会有所帮助。我不是在寻找更好还是更坏。而是它们之间的区别。答案是正确的将取决于我的要求。
你说得对。Flink的Table API及其Table
类与Kafka的KTable不对应。表API是一种嵌入API的关系语言(想想Java和Scala中集成的SQL)。
Flink的DataStream API没有对应于KTable的内置概念。相反,Flink提供了复杂的状态管理,KTable将是具有键控状态的常规操作员。
例如,具有两个输入的有状态运算符存储从第一个输入观察到的最新值,并将其与来自第二个输入的值连接,可以用CoFlatMapFunction
实现,如下所示:
DataStream<Tuple2<Long, String>> first = ...
DataStream<Tuple2<Long, String>> second = ...
DataStream<Tuple2<String, String>> result = first
// connect first and second stream
.connect(second)
// key both streams on the first (Long) attribute
.keyBy(0, 0)
// join them
.flatMap(new TableLookup());
// ------
public static class TableLookup
extends RichCoFlatMapFunction<Tuple2<Long,String>, Tuple2<Long,String>, Tuple2<String,String>> {
// keyed state
private ValueState<String> lastVal;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<String> valueDesc =
new ValueStateDescriptor<String>("table", Types.STRING);
lastVal = getRuntimeContext().getState(valueDesc);
}
@Override
public void flatMap1(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// update the value for the current Long key with the String value.
lastVal.update(value.f1);
}
@Override
public void flatMap2(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
// look up latest String for current Long key.
String lookup = lastVal.value();
// emit current String and looked-up String
out.collect(Tuple2.of(value.f1, lookup));
}
}
一般来说,state可以非常灵活地与Flink一起使用,让我们实现广泛的用例。还有更多的状态类型,如ListState
和MapState
,对于ProcessFunction
,您可以对时间进行细粒度控制,例如,如果某个键在一定时间内没有更新,则可以删除该键的状态(据我所知,KTables有相应的配置)。