Flink-键控过程函数中的Java类成员



我有以下flink keyedprocess函数。我基本上是在尝试实现状态设计模式。

public AlertProcessor extends KeyedProcessFunction<Tuple2<String, String>, Event1, Event2> {
private transient AlertState currentState;
private transient AlertState activeAlertState;
private transient AlertState noActiveAlertState;
private transient AlertState resolvedAlertState;
@Override
public void open(Configuration parameters) {
activeAlertState = new ActiveAlertState();
noActiveAlertState = new NoActiveAlertState();
resolvedAlertState = new ResolvedAlertState();
}

@Override
public processElement(Event1 event1, Context ctx, Collector<Event2> out) throws Exception {
// Would the below if condition work for multiple keys?
if (currentAlertState == null) {
currentAlertState = noActiveAlertState;
}
currentAlertState.handle(event1, out);
}
private interface AlertState {
void handle(Event1 event1, Collector<Event2> out);
} 
private class ActiveAlertState implements AlertState {
void handle(Event1 event1, Collector<Event2> out) {
logger.debug("Moving to no alertState");

// Do something and push some Event2 to out
currentAlertState = resolvedActiveAlertState;
}
}

private class NoActiveAlertState implements AlertState {
void handle(Event1 event1, Collector<Event2> out) {
logger.debug("Moving to no alertState");

// Do something and push some Event2 to out
currentAlertState = activeAlertState;
}
}
private class ResolvedAlertState implements AlertState {

void handle(Event1 event1, Collector<Event2> out) {
logger.debug("Moving to no alertState");

// Do something and push some Event2 to out
currentAlertState = noActiveAlertState;
}
}
}

我的问题是

  1. 流中每个键会有一个AlertProcessor实例(或对象(吗?换句话说,currentAlertState对象是否每个键都是唯一的?或者这个AlertProcessor操作符的每个实例将有一个currentAlertState

如果currentAlertState是运算符的每个实例,那么此代码将不会真正工作,因为currentAlertState将被不同的键覆盖。我的理解正确吗?

  1. 我可以将currentAlertState存储在键控状态,并为每个processElement((调用初始化它。如果我这样做,我就不需要在handle((实现中将currentAlertState分配或设置为下一个状态,因为currentAlertState无论如何都会根据处于flink状态的内容进行初始化。

  2. 有没有更好的方法来实现flink中的状态设计模式,并且仍然减少创建的状态对象的数量?

将在管道的每个并行实例(每个任务槽(中创建一个AlertProcessor实例,并将其复用到该槽处理的所有密钥上。

如果currentAlertState是运算符的每个实例,那么此代码将不会真正工作,因为currentAlertState将被不同的键覆盖。我的理解正确吗?

正确。您应该为currentAlertState使用键控状态,这将在状态后端为每个不同的键生成一个条目。

相关内容

  • 没有找到相关文章

最新更新