我在我的flink代码中使用RocksDB作为状态后端来实现增量检查点,但我想知道增量检查点是否正在发生我的意思是,有没有方法检查日志或flink仪表板,无论它是执行增量检查点还是完整检查点
- 根据flink文档,我使用的是flink 1.10.0版本。我看到flink 1.10.0版本中的日志记录机制被禁用。我遵循这个Ververica链接启用了RocksDB日志记录。下面是我使用的启用日志记录的代码
import static org.apache.flink.configuration.ConfigOptions.key;
import java.util.Collection;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
private static final long serialVersionUID = 1L;
private String dbLogDir = "";
@Override
public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
currentOptions = super.createDBOptions(currentOptions, handlesToClose);
currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
currentOptions.setStatsDumpPeriodSec(60);
currentOptions.setDbLogDir(dbLogDir);
return currentOptions;
}
@Override
public String toString() {
return this.getClass().toString() + "{" + super.toString() + '}';
}
/**
* Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
* data directory's absolute path will be used as the log file prefix).
*/
public void setDbLogDir(String dbLogDir) {
this.dbLogDir = dbLogDir;
}
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
"data directory's absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
我在代码中做了以下设置以启用日志
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(interval);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
env.setStateBackend(stateBackend);
stateBackend.setRocksDBOptions(options);
我在我的flink配置文件中添加了以下2个设置,以启用RocksDB日志
state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog
我浏览了完整的flink仪表板,但我不知道如何检查增量检查点是否正在发生或完全检查点是否在发生。请帮助我如何设置RocksDB的日志记录,以了解增量检查点是否正在发生。我在文档中看到RocksDB日志记录将导致性能和存储方面的巨大成本,这是为了测试目的,之后我将禁用
我不确定这些信息是否被记录或显示在任何地方,但在您的代码中,您可以使用
stateBackend.isIncrementalCheckpointsEnabled()
以确定RocksDB状态后端是否启用了检查点,然后自己记录这些信息。
请注意,要启用增量检查点(默认情况下是关闭的(,您需要配置
state.backend.incremental: true