RichFlatMap中的状态管理(带或不带keyBy)



我有一个像这样的流应用程序:

DataStream<MyObject> stream1 = source
.keyBy("clientip")
.flatMap(new MyFlatMapFunction())
.name("Stream1");
//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {
private transient ValueState<Boolean> valueState;
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(12))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
.build();
ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
"valueState",
Types.BOOLEAN);
valueStateeDescriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(valueState);
}
@Override
public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
{
// get value from value state, check if it is matched with something
// if matches some condition, then collector.collect(myObject)
// update state for each myObject
}
}

不是:有3个工人在不同的3台机器上,有16个平行度。总并行度为48。

当我实现这个代码时;如果ip地址1.2.3.4与条件匹配,则来自同一ip地址1.2.3.4.的后续请求总是与条件匹配直到状态被清除";。这个说法正确吗?

我从flink文档中了解到,如果ip地址1.2.3.4转到machine1(通过生成clientip的哈希值(,那么所有来自ip地址1.2.3.3的请求总是转到machine1?

open()方法在任务管理器jvm内部调用一次。因此,flink创建了48个flatMapOperation实例(48个实例中的1-15个位于机器1中,48个实例的16-32个位于机器2中,48个中的33-48个位于机器3中(,每个flatMapInstance都将运行open方法。这意味着开放式方法要跑48次?

最后,所有48个实例都访问相同的状态,但访问不同的值(因为状态是本地的(。我的意思是,一部分实例组(假设机器1上的16个实例(将获得相同的状态值。

最后,如果在FlatMap之前没有keyBy,那么来自ip地址1.2.3.4的请求是否可以以随机方式进入机器1、机器2或机器3?

  1. 由于执行keyBy("clientip"),因此该字段具有相同值的所有记录都将由相同的MyFlatMapFunction子任务处理。因此,所有记录的集合被划分为48个子任务,假设IP地址的计数均匀分布,每个子任务将获得大约所有记录的1/48
  2. 是的,将有48个MyFlatMapFunction实例被实例化,因此有48个对open()的调用
  3. all of 48 instances access the same state。否,状态是每个唯一的键,因此状态是按键值在48个子任务中划分的
  4. 如果没有keyBy(),那么MyFlatMapFunction操作符的每个子任务将从源获取分区中的任何数据。这取决于您的数据源,例如,如果您正在阅读Kafka主题,并且该主题有48个分区,那么从Kafka分区到MyFlatMapFunction子任务有一个1对1的映射。如果您的Kafka分区少于48个,则某些MyFlatMapFunction子任务将无法获得任何数据。如果您想将传入的记录重新分配给所有子任务,那么您可以执行rebalance()。但请注意,您将无法维护每个IP地址的状态

相关内容

  • 没有找到相关文章

最新更新