如何在 Flink 中连接两个未加密的流并相互共享状态



Flink 版本 1.6.1

在下面的示例中,我想连接两个未加密的流。但似乎这两个流无法正确共享状态。我不知道实现它的正确方法是什么。

法典:

public class TransactionJob {
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> stream1 = env.fromElements("1", "2");
    DataStream<Integer> stream2 = env.fromElements(3, 4, 5);
    ConnectedStreams<String, Integer> connectedStreams = stream1.connect(stream2);
    DataStream<String> resultStream = connectedStreams.process(new StringIntegerCoProcessFunction());
    resultStream.print().setParallelism(1);
    env.execute();
}
private static class StringIntegerCoProcessFunction extends CoProcessFunction<String, Integer, String> implements CheckpointedFunction {
    private transient ListState<String> state1;
    private transient ListState<Integer> state2;
    @Override
    public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
        state1.add(value);
        print(value);
    }
    @Override
    public void processElement2(Integer value, Context ctx, Collector<String> out) throws Exception {
        state2.add(value);
        print(value.toString());
    }
    private void print(String value) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append("input value is " + value + ".");
        builder.append("state1 has ");
        for (String str : state1.get()) {
            builder.append(str + ",");
        }
        builder.append("state2 has ");
        for (Integer integer : state2.get()) {
            builder.append(integer.toString() + ",");
        }
        System.out.println(builder.toString());
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
    }
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<String> descriptor1 =
                new ListStateDescriptor<>(
                        "state1",
                        TypeInformation.of(new TypeHint<String>() {
                        }));
        ListStateDescriptor<Integer> descriptor2 =
                new ListStateDescriptor<>(
                        "state2",
                        TypeInformation.of(new TypeHint<Integer>() {
                        }));
        state1 = context.getOperatorStateStore().getListState(descriptor1);
        state2 = context.getOperatorStateStore().getListState(descriptor2);
    }
}

}

输出:

input value is 4.state1 has state2 has 4,
input value is 2.state1 has 2,state2 has 4,
input value is 3.state1 has state2 has 3,
input value is 1.state1 has 1,state2 has 3,
input value is 5.state1 has state2 has 5,

我希望最后一块输出将是

input value is XX .state1 has 1,2 state2 has 3,4,5

但实际上输出看起来像是输入项已分区。 4 和 2 在一个分区中,3 和 1 在另一个分区中。我想访问存储在 processElement1processElement2 的状态 1 和状态 2 中的所有数据。

您应该修改作业的开头,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
...

这将导致整个作业以并行度 1 运行。你确实有

resultStream.print().setParallelism(1);

具有将打印接收器设置为并行度为 1 的效果,但作业的其余部分以默认并行度运行,该并行度显然大于 1。

或者,可以通过相同的常量键对两个流进行键控,然后使用键控状态。

相关内容

  • 没有找到相关文章

最新更新