我正试图使用保存点停止我的作业,然后使用相同的保存点重新启动它。对于我的案例,我更新了我的作业,并用新的jar为它创建了新的版本。这是我的代码示例;
class Reader(bla bla) {
def read() = {
val ds = readFromKafka()
transform(ds)
}
def transform(ds: DataStream[]) = {
ds.map()
}
}
object MyJob {
def run () = {
val data = new Reader().read()
data.keyBy(id).process(new MyStateFunc).uid("my-uid") // then write to kafka
}
}
在这种情况下,我确实使用保存点停止了作业,然后使用相同的保存点和相同的jar启动它。然后,我像这样给我的阅读器添加了一个过滤器;
class Reader(bla bla) {
def read() = {
val ds = readFromKafka()
transform(ds)
}
def transform(ds: DataStream[]) = {
ds.map().filter() // FILTER ADDED HERE
}
}
我用保存点停止我的工作,它有效。然后,我尝试使用相同的保存点部署具有新版本(新过滤器方法(的作业,它无法匹配运算符,并且作业不部署。为什么?
除非在获取保存点之前为所有有状态运算符显式提供UID,否则在更改作业拓扑后,Flink将无法确定保存点中的哪个状态属于哪个运算符。
我看到您的键控进程函数上有一个UID("myuid"(。但是,您还需要在Kafka源和接收器上拥有UID,以及其他任何有状态的东西。这些UID需要附加到有状态运算符本身,并且需要在作业中是唯一的(但不是在所有作业中(。(此外,每个状态描述符都需要为每个状态分配一个名称,使用操作员内部唯一的名称。(
通常情况下,一个人会做类似的事情
env
.addSource(...)
.name("KafkaSource")
.uid("KafkaSource")
results.addSink(...)
.name("KafkaSink")
.uid("KafkaSink")
其中CCD_ 1方法用于提供出现在web UI中的文本。