Flink:如何在任务管理器中设置系统属性



我有一些来自kafka的代码读取消息,如下所示:

def main(args: Array[String]): Unit = {
    System.setProperty("java.security.auth.login.config", "someValue")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val consumerProperties = new Properties()
    consumerProperties.setProperty("security.protocol", "SASL_PLAINTEXT")
    consumerProperties.setProperty("sasl.mechanism", "PLAIN")
    val kafkaConsumer = new FlinkKafkaConsumer011[ObjectNode](consumerProperties.getProperty("topic"), new JsonNodeDeserializationSchema, consumerProperties)
    val stream = env.addSource(kafkaConsumer)
}

当源尝试从 Apache Kafka 读取消息时,org.apache.kafka.common.security.JaasContext.defaultContext 函数将加载"java.security.auth.login.config"属性。

但是该属性仅在作业管理器中

设置,当我的作业运行时,该属性无法在任务管理器中正确加载,因此源将失败。

我尝试设置额外的JVM_OPTS,例如"-Dxxx=yyy",但 flink 集群以独立模式部署,环境变量不能经常更改。

有什么方法可以在任务管理器中设置属性吗?

Flink 独立集群的文件bin/config.sh包含一个名为 DEFAULT_ENV_JAVA_OPTS 的属性。

此外,如果您export $JVM_ARGS="your parameters"文件bin/config.sh将使用以下行加载它:

# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
    JVM_ARGS=""
fi

相关内容

  • 没有找到相关文章

最新更新