我有一个像这样的流应用程序:
DataStream<MyObject> stream1 = source
.keyBy("clientip")
.flatMap(new MyFlatMapFunction())
.name("Stream1");
//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {
private transient ValueState<Boolean> valueState;
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(12))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
.build();
ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
"valueState",
Types.BOOLEAN);
valueStateeDescriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(valueState);
}
@Override
public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
{
// get value from value state, check if it is matched with something
// if matches some condition, then collector.collect(myObject)
// update state for each myObject
}
}
不是:有3个工人在不同的3台机器上,有16个平行度。总并行度为48。
当我实现这个代码时;如果ip地址1.2.3.4与条件匹配,则来自同一ip地址1.2.3.4.的后续请求总是与条件匹配直到状态被清除";。这个说法正确吗?
我从flink文档中了解到,如果ip地址1.2.3.4转到machine1(通过生成clientip的哈希值(,那么所有来自ip地址1.2.3.3的请求总是转到machine1?
open()
方法在任务管理器jvm内部调用一次。因此,flink创建了48个flatMapOperation实例(48个实例中的1-15个位于机器1中,48个实例的16-32个位于机器2中,48个中的33-48个位于机器3中(,每个flatMapInstance都将运行open方法。这意味着开放式方法要跑48次?
最后,所有48个实例都访问相同的状态,但访问不同的值(因为状态是本地的(。我的意思是,一部分实例组(假设机器1上的16个实例(将获得相同的状态值。
最后,如果在FlatMap之前没有keyBy,那么来自ip地址1.2.3.4的请求是否可以以随机方式进入机器1、机器2或机器3?
- 由于执行
keyBy("clientip")
,因此该字段具有相同值的所有记录都将由相同的MyFlatMapFunction
子任务处理。因此,所有记录的集合被划分为48个子任务,假设IP地址的计数均匀分布,每个子任务将获得大约所有记录的1/48 - 是的,将有48个
MyFlatMapFunction
实例被实例化,因此有48个对open()
的调用 all of 48 instances access the same state
。否,状态是每个唯一的键,因此状态是按键值在48个子任务中划分的- 如果没有
keyBy()
,那么MyFlatMapFunction
操作符的每个子任务将从源获取分区中的任何数据。这取决于您的数据源,例如,如果您正在阅读Kafka主题,并且该主题有48个分区,那么从Kafka分区到MyFlatMapFunction
子任务有一个1对1的映射。如果您的Kafka分区少于48个,则某些MyFlatMapFunction
子任务将无法获得任何数据。如果您想将传入的记录重新分配给所有子任务,那么您可以执行rebalance()
。但请注意,您将无法维护每个IP地址的状态