Flink 获取 keyedState 状态值并在另一个流中使用



我知道键状态属于它的键,只有当前键访问它的状态值,其他键无法访问不同键的状态值。

我尝试使用相同的密钥但在不同的流中访问状态。可能吗?

如果不可能,那么我将有 2 个重复数据?

不是:我需要两个流,因为它们中的每一个都有不同的时间窗口和不同的实现。

这是一个例子(我知道keyBy(sommething(对于两个流操作是相同的(:

public class Sample{
streamA
.keyBy(something)
.timeWindow(Time.seconds(4))
.process(new CustomMyProcessFunction())
.name("CustomMyProcessFunction")
.print();
streamA
.keyBy(something)
.timeWindow(Time.seconds(1))
.process(new CustomMyAnotherProcessFunction())
.name("CustomMyProcessFunction")
.print();
}
public class CustomMyProcessFunction extends ProcessWindowFunction<..>
{
private Logger logger = LoggerFactory.getLogger(CustomMyProcessFunction.class);
private transient ValueState<SimpleEntity> simpleEntityValueState;
private SimpleEntity simpleEntity;
@Override
public void open(Configuration parameters) throws Exception
{
ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
"sample",
TypeInformation.of(SimpleEntity.class)
);
simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
}
@Override
public void process(...) throws Exception
{
SimpleEntity value = simpleEntityValueState.value();
if (value == null)
{
SimpleEntity newVal = new SimpleEntity("sample");
logger.info("New Value put");
simpleEntityValueState.update(newVal);
}
...
}
...
}
public class CustomMyAnotherProcessFunction extends ProcessWindowFunction<..>
{

private transient ValueState<SimpleEntity> simpleEntityValueState;
@Override
public void open(Configuration parameters) throws Exception
{
ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
"sample",
TypeInformation.of(SimpleEntity.class)
);
simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
}
@Override
public void process(...) throws Exception
{
SimpleEntity value = simpleEntityValueState.value();
if (value != null)
logger.info(value.toString()); // I expect that SimpleEntity("sample")
out.collect(...);
}
...
}

正如已经指出的那样,状态始终是单个运算符实例的本地状态。它不能被共享。

但是,您可以做的是将状态更新从持有状态的操作员流式传输到需要它的其他操作员。使用端输出,可以创建复杂的数据流,而无需共享状态。

我尝试了您的想法,即使用相同的键在两个运算符之间共享状态。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.io.IOException;
public class FlinkReuseState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<Integer> stream1 = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
int i = 0;
while (true) {
sourceContext.collect(1);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
DataStream<Integer> stream2 = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (true) {
sourceContext.collect(1);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});

DataStream<Integer> windowedStream1 = stream1.keyBy(Integer::intValue)
.timeWindow(Time.seconds(3))
.process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
private ValueState<Integer> value;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<Integer>("value", Integer.class);
value = getRuntimeContext().getState(desc);
}
@Override
public void process(Integer integer, Context context, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
iterable.forEach(x -> {
try {
if (value.value() == null) {
value.update(1);
} else {
value.update(value.value() + 1);
}
} catch (IOException e) {
e.printStackTrace();
}
});
collector.collect(value.value());
}
});
DataStream<String> windowedStream2 = stream2.keyBy(Integer::intValue)
.timeWindow(Time.seconds(3))
.process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
private ValueState<Integer> value;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<Integer>("value", Integer.class);
value = getRuntimeContext().getState(desc);
}
@Override
public void process(Integer s, Context context, Iterable<Integer> iterable, Collector<String> collector) throws Exception {
iterable.forEach(x -> {
try {
if (value.value() == null) {
value.update(1);
} else {
value.update(value.value() + 1);
}
} catch (IOException e) {
e.printStackTrace();
}
});
collector.collect(String.valueOf(value.value()));
}
});
windowedStream2.print();
windowedStream1.print();
env.execute();
}
}

它不起作用,每个流只更新自己的值状态,输出如下。

3> 3
3> 3
3> 6
3> 6
3> 9
3> 9
3> 12
3> 12
3> 15
3> 15
3> 18
3> 18
3> 21
3> 21
3> 24
3> 24

键控状态

根据官方文档,*每个键状态在逻辑上绑定到唯一的<parallel-operator-instance, key>组合,并且由于每个键"属于"一个键运算符的一个并行实例,我们可以简单地将其视为<operator, key>*

我认为不可能通过为不同运算符中的状态提供相同的名称来共享状态。

你试过协处理器函数吗?通过这样做,您还可以为每个流实现两个进程函数,唯一的问题是时间窗口。您能否提供有关流程逻辑的更多详细信息?

为什么不能将状态作为映射操作的一部分返回,并且该流可用于连接到其他流

相关内容

  • 没有找到相关文章

最新更新