我有一个容器化的 flink 集群,其中包含一个独立的 JobManager 和 2 个任务管理器。 当我提交一个只包含我需要的连接器和我的代码的胖罐子时,一切正常,但是只有我的代码的罐子会失败——即使我已经将连接器罐添加到/opt/flink/lib 并且容器进程显示它们被添加到类路径中。 当我尝试从 IDE 进行远程连接时,也会发生同样的故障;不用说,每次都必须捆绑一个胖罐子,这会导致一种悲惨的开发体验。
我需要做什么才能让 flink 理解我放置在/opt/flink/lib 中的附加 jar 应该用于所有新作业?
类装入器异常:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: 不能 加载用户类: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
链接 Dockerfile:
FROM flink:1.7.2
ADD http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.7.2/flink-connector-kafka_2.12-1.7.2.jar /opt/flink/lib/
ADD http://central.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.7.2/flink-avro-confluent-registry-1.7.2.jar /opt/flink/lib/
ADD http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6_2.12/1.7.2/flink-connector-elasticsearch6_2.12-1.7.2.jar /opt/flink/lib/
作业管理器容器:
root@2406b722dae1:/tmp# ps ax | more
PID TTY STAT TIME COMMAND
1 ? Ssl 1:01 /docker-java-home/jre/bin/java -Xms1024m -Xmx1024m -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFil
e=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/flink-avro-confluent-registry-1.7.2.jar:/opt/flink/lib/flink-connector-elasticsearch6_2.12-1.7.2.jar:/o
pt/flink/lib/flink-connector-kafka_2.12-1.7.2.jar:/opt/flink/lib/flink-python_2.12-1.7.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/fl
ink/lib/flink-dist_2.12-1.7.2.jar::: org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir /opt/flink/conf --executionMode cluster
任务管理器容器:
root@bd1aa6e35b5a:/tmp# ps ax | more
PID TTY STAT TIME COMMAND
1 ? Ssl 0:28 /docker-java-home/jre/bin/java -XX:+UseG1GC -Xms922M -Xmx922M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-c
onsole.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/flink-avro-confluent-registry-1.7.2.jar:/opt/flink/lib/flin
k-connector-elasticsearch6_2.12-1.7.2.jar:/opt/flink/lib/flink-connector-kafka_2.12-1.7.2.jar:/opt/flink/lib/flink-python_2.12-1.7.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/op
t/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.2.jar::: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf
/opt/flink/lib 在正在运行的 JobManager 和 TaskManager 上:
root@bd1aa6e35b5a:/opt/flink/lib# ls -lh
total 84M
-rw------- 1 root root 2.7M Feb 11 16:25 flink-avro-confluent-registry-1.7.2.jar
-rw------- 1 root root 30K Feb 11 16:21 flink-connector-elasticsearch6_2.12-1.7.2.jar
-rw------- 1 root root 67K Feb 11 16:24 flink-connector-kafka_2.12-1.7.2.jar
-rw-r--r-- 1 flink flink 81M Feb 11 14:50 flink-dist_2.12-1.7.2.jar
-rw-r--r-- 1 flink flink 139K Feb 11 14:49 flink-python_2.12-1.7.2.jar
-rw-rw-r-- 1 flink flink 479K Feb 11 14:32 log4j-1.2.17.jar
-rw-rw-r-- 1 flink flink 9.7K Feb 11 14:32 slf4j-log4j12-1.7.15.jar
以防万一有任何疑问 - 该类在预期的jar中:
root@bd1aa6e35b5a:/opt/flink/lib# unzip -l flink-connector-kafka_2.12-1.7.2.jar | grep FlinkKafkaConsumer
14272 2019-02-12 00:24 org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.class
我想通了。 我的第一个问题是 flink 在 flink 用户下运行,从列表中可以看出,我在 Dockerfile 中添加的文件归 root 所有。 在那之后我遇到的问题是 sbt 程序集为你捆绑了所有的传递依赖项,你必须在 Dockerfile 中手动完成(胖罐文件大小和我添加的文件之间的差异应该已经放弃了)。
我修复了 Dockerfile 的所有权问题和 Kafka 连接器的传递依赖项(elasticsearch 可能仍然有很多缺少的依赖项):
FROM flink:1.7.2
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.7.2/flink-connector-kafka_2.12-1.7.2.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.12/1.7.2/flink-connector-kafka-base_2.12-1.7.2.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/flink/flink-avro/1.7.2/flink-avro-1.7.2.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/flink/flink-json/1.7.2/flink-json-1.7.2.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.7.2/flink-avro-confluent-registry-1.7.2.jar /opt/flink/lib/
ADD --chown=flink:flink http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6_2.12/1.7.2/flink-connector-elasticsearch6_2.12-1.7.2.jar /opt/flink/lib/