我在flink中使用广播状态模式,我试图连接两个流,一个流是规则的控制流,另一个流是整数流(用于虚拟播放目的)。
我有以下Rule
类
public class Rule {
String id;
int val;
RuleType ruleType;
//Newly added field
//int val2 = 0;
public Rule() {}
public Rule(String id, int val, RuleType ruleType) {
this.id = id;
this.val = val;
this.ruleType = ruleType;
//this.val2 = val2;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getVal() {
return val;
}
public void setVal(int val) {
this.val = val;
}
public RuleType getRuleType() {
return ruleType;
}
public void setRuleType(RuleType ruleType) {
this.ruleType = ruleType;
}
//public int getVal2() {
// return val2;
//}
//public void setVal2(int val2) {
// this.val2 = val2;
//}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Rule rule = (Rule) o;
return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
}
@Override
public int hashCode() {
return Objects.hash(id, val, ruleType);
}
@Override
public String toString() {
return "Rule{" +
"name='" + id + ''' +
", val=" + val +
", ruleType=" + ruleType +
'}';
}
}
这是RuleType类public enum RuleType {
X,
Y,
Z
}
在BroadcastState
中,我存储List<Rule> ruleList;
。我尝试以下步骤来检查模式进化是否适用于此,如文档中提到的:
Start flink cluster
提交作业jar
使用
flink savepoint <jobId>
命令获取保存点停止作业。
修改代码,如上所示在Rule类中添加
int
字段val2
。创建一个新的jar尝试使用
flink -s <savepoint>
命令恢复作业。
在此情况下,作业无法重新启动,因为模式演化失败,并出现以下错误:由:org.apache.flink.util.FlinkException:无法从提供的1个恢复选项中恢复CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1)的操作员状态后端。org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore (BackendRestorerProcedure.java: 160)org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend (StreamTaskStateInitializerImpl.java: 286)org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext (StreamTaskStateInitializerImpl.java: 174)…11个org.apache.flink.runtime.state.BackendBuildingException:当尝试恢复操作符状态后端时失败org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build (DefaultOperatorStateBackendBuilder.java: 83)org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend (HashMapStateBackend.java: 148)在org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda operatorStateBackend 0美元(StreamTaskStateInitializerImpl.java: 277)org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore (BackendRestorerProcedure.java: 168)org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore (BackendRestorerProcedure.java: 135)…13由:com.esotericsoftware.kryo.KryoException: Unable to find class: 11com.esotericsoftware.kryo.util.DefaultClassResolver.readName (DefaultClassResolver.java: 138)com.esotericsoftware.kryo.util.DefaultClassResolver.readClass (DefaultClassResolver.java: 115)com.esotericsoftware.kryo.Kryo.readClass (Kryo.java: 641)
有人能帮帮忙吗?我怀疑是由于某种原因POJO序列化器没有使用Rule类,但我不明白为什么?它符合POJO的所有条件。
这是我经过大量研究后找到的答案。Flink doc在提供示例方面真的很糟糕,所以这可能会帮助其他人遇到同样的问题。
我可以为这个类创建自己的TypeInfoFactory,如下所示:
public static class MyPojoTypeInfoFactoryForRule extends TypeInfoFactory<Rule> {
@Override
public TypeInformation<Rule> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
Map<String, TypeInformation<?>> fields =
new HashMap<String, TypeInformation<?>>() {
{
put("id", Types.STRING);
put("val", Types.INT);
put("ruleType", Types.ENUM(RuleType.class));
put("val2", Types.INT);
}
};
return Types.POJO(Rule.class, fields);
}
}
然后用此工厂注释Rule类,以便将Rule类序列化为POJO。没有答案的问题是,如何使它更好?我可以为枚举类编写工厂而不是为整个Rule类编写工厂吗?