我有一些来自kafka的代码读取消息,如下所示:
def main(args: Array[String]): Unit = {
System.setProperty("java.security.auth.login.config", "someValue")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val consumerProperties = new Properties()
consumerProperties.setProperty("security.protocol", "SASL_PLAINTEXT")
consumerProperties.setProperty("sasl.mechanism", "PLAIN")
val kafkaConsumer = new FlinkKafkaConsumer011[ObjectNode](consumerProperties.getProperty("topic"), new JsonNodeDeserializationSchema, consumerProperties)
val stream = env.addSource(kafkaConsumer)
}
当源尝试从 Apache Kafka 读取消息时,org.apache.kafka.common.security.JaasContext.defaultContext 函数将加载"java.security.auth.login.config"属性。
但是该属性仅在作业管理器中设置,当我的作业运行时,该属性无法在任务管理器中正确加载,因此源将失败。
我尝试设置额外的JVM_OPTS,例如"-Dxxx=yyy",但 flink 集群以独立模式部署,环境变量不能经常更改。
有什么方法可以在任务管理器中设置属性吗?
Flink 独立集群的文件bin/config.sh
包含一个名为 DEFAULT_ENV_JAVA_OPTS
的属性。
此外,如果您export $JVM_ARGS="your parameters"
文件bin/config.sh
将使用以下行加载它:
# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
JVM_ARGS=""
fi