我连接了两个流,然后调用process
来实现我的逻辑以获得结果。下面是我的Flink代码的流程。
SingleOutputStreamOperator<LifetimeIndex> autoEncodedRulStream = dividedStream.getSideOutput(autoEncodedRULModelTag)
.keyBy(a -> a.getModelName() + "-" + a.getParameterId(), TypeInformation.of(new TypeHint<String>() {
}))
.process(AssetParameterBundler.create())
.connect(eventStream)
.keyBy(a -> a.getAssetId() + "-" + a.getModelName(),b -> b.toString())
.process(new Simulator())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LifetimeIndex>(Time.milliseconds(0L)) {
@Override
public long extractTimestamp(LifetimeIndex element) {
return element.getTime().getTime();
}
});
在simulator()
类中,我扩展了Coprocessfunction
,以便处理两个不同的流,即eventStream和mainStream。然而,即使我更新了processElement2
中的状态,simulator()
类中的状态变量也总是返回NULL
。
下面是simulator()
类的逻辑。正如您所看到的,没有特殊的逻辑,但当新数据进入事件流时,会触发新逻辑(但它总是null,因此if语句没有调用(。
public class AutoEncoderSimulator extends CoProcessFunction<BundledAssetParameters, String, LifetimeIndex> {
// private transient MapState<String,String> mapState;
private transient ValueState<String> state;
private int numOfdataPoints = 0;
private List<double[]> trainElementList;
private List<Double> estimatedThresholdList;
private ApmAutoencoder autoencoder;
private double rulThreshold;
private double trainDataPoint;
private double lastHealthIndex;
private AutoUpdate autoUpdate;
@Override
public void processElement1(BundledAssetParameters value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
LazyObject body = new LazyObject(value.getSpecifications().getModelOptions());
rulThreshold = body.getDouble("rulThreshold");
trainDataPoint = body.getDouble("trainDataPoint");
numOfdataPoints = numOfdataPoints + 1;
String event = state.value();
trainElementList.add(value.getValues());
//EVENT IS ALWAYS NULL!
if (event != null) {
}
if (numOfdataPoints == trainDataPoint) {
double[][] trainElement = ArrayUtils.toArray(new double[trainElementList.size()][]);
for(int i=0;i<numOfdataPoints;++i){
trainElement[i] = trainElementList.get(i);
}
AutoEncoderConfig autoEncoderConfig = new AutoEncoderConfig(trainElement.length,trainElement[0].length);
autoencoder = new ApmAutoencoder(autoEncoderConfig);
autoencoder.train(trainElement, 200);
//auto update
AutoUpdate autoUpdate = new AutoUpdate();
double firstThreshold = autoUpdate.firstThreshold(autoencoder.getHealthIndex(trainElement), 10, 0.995);
estimatedThresholdList.add(firstThreshold);
} else if (numOfdataPoints > trainDataPoint) {
double[] values = value.getValues();
}
}
@Override
public void processElement2(String value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
state.update(value);
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>(
"1",
TypeInformation.of(new TypeHint<String>() {}));
state = getRuntimeContext().getState(descriptor);
trainElementList = new ArrayList<>();
}
public AutoEncoderSimulator create(){
return new AutoEncoderSimulator();
}
}
如果你能解决我的问题,我将不胜感激。
您可能忽略了一件事,那就是这是键分区状态——所以state
变量不是String,而是一个引用分布式键/值存储的句柄(在这种情况下,键和值是String(。
当您在processElement2
中调用state.update(value)
时,该哈希表中上下文中键(当前事件的键(的条目将更新。当稍后在processElement1
中调用state.value()
时,您确定上下文中存在相同的密钥吗?
由于两个连接的流处于共享状态,因此必须以兼容的方式对两个流进行键控。我看到两个流都是由字符串键控的,但不清楚这些字符串是否来自同一个键空间。modelName + parameterId
似乎不太可能等于assetId + modelName
。
Flink培训网站上有一个关于这种模式的简单例子,你可能会发现它很有用。