检查点时的 CEP 问题。"Could not find id for entry"



打开检查点时简单的CEP循环模式

 private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
        .followedBy("middle").where(checkStatusOn).times(2)
        .next("end").where(checkStatusOn).within(Time.minutes(5))

我看到失败。

SimpleBinaryEvent是

public class SimpleBinaryEvent implements Serializable {
private int id;
private int sequence;
private boolean status;
private long time;
public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
    this.id = id;
    this.sequence = sequence;
    this.status = status;
    this.time = time;
}
public int getId() {
    return id;
}
public int getSequence() {
    return sequence;
}
public boolean isStatus() {
    return status;
}
public long getTime() {
    return time;
}
@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    SimpleBinaryEvent that = (SimpleBinaryEvent) o;
    if (getId() != that.getId()) return false;
    if (isStatus() != that.isStatus()) return false;
    if (getSequence() != that.getSequence()) return false;
    return getTime() == that.getTime();
}
@Override
public int hashCode() {
    //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
    int result = getId();
    result = 31 * result + (isStatus() ? 1 : 0);
    result = 31 * result + getSequence();
    result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
    return result;
}
@Override
public String toString() {
    return "SimpleBinaryEvent{" +
            "id='" + id + ''' +
            ", status=" + status +
            ", sequence=" + sequence +
            ", time=" + time +
            '}';
}

}

失败原因:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....

我确定我有Equals()和HashCode()可以按照应有的方式实现。我也尝试了对象。在其他情况下,我在sharedBuffer.toString()上曾在coundularReference上(以及stackoverflow),这再次指出了引用(平等和没有)的问题。无需检查点即可按预期工作。我正在当地集群上运行。CEP生产准备好了吗?

我正在使用1.3.2 flink

非常感谢您尝试了图书馆并报告此图书馆!

随着越来越多的功能添加了库。1.3是图书馆的第一个发行版,具有如此丰富的语义,因此我们希望看到1)人们如何使用它,以及2)如果有任何错误。所以我想说的不是100%的生产已经准备就绪,但不远。

现在,对于手头的问题,我想您正在使用RockSDB进行检查点,对吗?我假设我的原因是,在每个水印(在事件时间)中,您可以将必要的状态(例如NFA)进行处理,然后对一些事件进行处理,然后再次将其序列化,然后再将其放回岩石中。

对于文件系统状态后端而言,情况并非如此,在该状态下,您只在检查点时序列化状态,然后才能阅读并仅在恢复时将其序列化。因此,在这种情况下,鉴于您说过没有检查点的工作正常工作,只有从失败中恢复后才看到此问题。

问题的根可以是 equals()/hashcode()是错误的(似乎并非如此),或者在我们序列化/对CEP状态的方式上存在问题。

您还可以提供最小的输入序列,以产生这种情况吗?为了重现问题,这真的很有帮助。

非常感谢,kostas

相关内容

  • 没有找到相关文章

最新更新