Flink 版本 1.6.1
在下面的示例中,我想连接两个未加密的流。但似乎这两个流无法正确共享状态。我不知道实现它的正确方法是什么。
法典:
public class TransactionJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.fromElements("1", "2");
DataStream<Integer> stream2 = env.fromElements(3, 4, 5);
ConnectedStreams<String, Integer> connectedStreams = stream1.connect(stream2);
DataStream<String> resultStream = connectedStreams.process(new StringIntegerCoProcessFunction());
resultStream.print().setParallelism(1);
env.execute();
}
private static class StringIntegerCoProcessFunction extends CoProcessFunction<String, Integer, String> implements CheckpointedFunction {
private transient ListState<String> state1;
private transient ListState<Integer> state2;
@Override
public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
state1.add(value);
print(value);
}
@Override
public void processElement2(Integer value, Context ctx, Collector<String> out) throws Exception {
state2.add(value);
print(value.toString());
}
private void print(String value) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("input value is " + value + ".");
builder.append("state1 has ");
for (String str : state1.get()) {
builder.append(str + ",");
}
builder.append("state2 has ");
for (Integer integer : state2.get()) {
builder.append(integer.toString() + ",");
}
System.out.println(builder.toString());
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> descriptor1 =
new ListStateDescriptor<>(
"state1",
TypeInformation.of(new TypeHint<String>() {
}));
ListStateDescriptor<Integer> descriptor2 =
new ListStateDescriptor<>(
"state2",
TypeInformation.of(new TypeHint<Integer>() {
}));
state1 = context.getOperatorStateStore().getListState(descriptor1);
state2 = context.getOperatorStateStore().getListState(descriptor2);
}
}
}
输出:
input value is 4.state1 has state2 has 4,
input value is 2.state1 has 2,state2 has 4,
input value is 3.state1 has state2 has 3,
input value is 1.state1 has 1,state2 has 3,
input value is 5.state1 has state2 has 5,
我希望最后一块输出将是
input value is XX .state1 has 1,2 state2 has 3,4,5
但实际上输出看起来像是输入项已分区。 4 和 2 在一个分区中,3 和 1 在另一个分区中。我想访问存储在 processElement1
和 processElement2
的状态 1 和状态 2 中的所有数据。
您应该修改作业的开头,如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
...
这将导致整个作业以并行度 1 运行。你确实有
resultStream.print().setParallelism(1);
具有将打印接收器设置为并行度为 1 的效果,但作业的其余部分以默认并行度运行,该并行度显然大于 1。
或者,可以通过相同的常量键对两个流进行键控,然后使用键控状态。