Apache Flink:Python streaming API 中的 Kafka 连接器,"Cannot load user class"



我正在尝试 Flink 的新 Python 流 API,并尝试使用./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py运行我的脚本。python脚本相当简单,我只是尝试从现有主题消费并将所有内容发送到stdout(或默认情况下输出方法发出数据的日志目录中的*.out文件(。

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema
directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
for jar in glob.glob(os.path.join(directory,'*.jar')):
sys.path.append(jar)
from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09
props = Properties()
config = {"bootstrap_servers": "localhost:9092",
"group_id": "flink_test",
"topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")
def main(factory):
consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)
env = factory.get_execution_environment()
env.add_java_source(consumer) 
.output()
env.execute()

我从 maven 存储库中抓取了一些 jar 文件,即flink-connector-kafka-0.9_2.11-1.6.1.jarflink-connector-kafka-base_2.11-1.6.1.jarkafka-clients-0.9.0.1.jar并将它们复制到 Flink 的lib目录中。除非我误解了文档,否则 Flink 加载 kafka 连接器就足够了。事实上,如果我删除这些罐子中的任何一个,导入就会失败,但这似乎不足以实际调用该计划。 添加一个 for 循环以动态将这些添加到sys.path也不起作用。以下是控制台中打印的内容:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

这是我在日志中看到的:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class:    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

有没有办法解决这个问题并使连接器可用于 Python?我怀疑这是 Jython 的类加载器问题,但我不知道如何进一步调查(也考虑到我对 Java 一无所知(。非常感谢。

你在这里使用了错误的Kafka消费者。在你的代码中,它是FlinkKafkaConsumer09的,但你正在使用的库是flink-connector-kafka-0.11_2.11-1.6.1.jar的,这是为了FlinkKafkaConsumer011。尝试用这个FlinkKafkaConsumer011替换FlinkKafkaConsumer09,或者使用 lib 文件flink-connector-kafka-0.9_2.11-1.6.1.jar而不是当前文件。

我来宾 jar 文件可能有内置的导入或依赖项,所以三个 jar 文件是不够的。至于如何找出java jar依赖关系,那是java maven所做的。您可以看到官方网站"项目构建设置"以获取帮助。 就我而言,我遵循官方的java项目设置,使用"from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer"并添加依赖项" org.apache.flink flink-clients_2.11

1.8.0 " 到 pom.xml,然后我现在可以使用 Python API 将 kafka 记录输出到 stdout。

相关内容

最新更新