如何连接两个kafka流在flink中(一个是运行的,第二个是静态的,只有很少的记录,像一个主表)



我想用第二个流的帮助来丰富我的第一个流,就像流动的记录不断地与第二个流连接,就像一个查找,我想永远保持在内存中,就像一个表。我可以使用的任何代码示例或任何flink API都适合这个用例。

您可以在Ververica培训页面中找到具有共享状态的连接流的示例:https://training.ververica.com(有状态流处理,幻灯片13)

public static class ControlFunction extends KeyedCoProcessFunction<String, String, String, String> {
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
​
@Override
public void processElement1(String controlValue, Context context, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}

@Override
public void processElement2(String dataValue, Context context, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(dataValue);
}
}
}

public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> data = env
.fromElements("Flink", "DROP", "Forward", "IGNORE")
.keyBy(x -> x);

control
.connect(data)
.process(new ControlFunction())
.print();

env.execute();
}
}

在您的情况下,您需要将第二个流的内容保持在KeyedCoProcessFunction状态,并从该状态读取第一个流以将其与其元素连接起来。你需要考虑如何为你的流设置键以及使用什么样的状态,但这将是主要的想法。

相关内容

  • 没有找到相关文章

最新更新