类型错误: 找不到 Java 类'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'



我正在使用PyFlink。我以为所有的Java依赖都是和pip install apache-flink一起安装的

上面的错误发生在这一行:

kafka_consumer = FlinkKafkaConsumer(
topics='mytopic',
deserialization_schema=deserialization_schema,
properties={
'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS})

根据错误,我需要通过下载和添加jar在pyflink环境中手动指定kafka消费者依赖关系吗?

请指教。

Python Version: 3.8.2
Java Version: java 11.0.11

由于Flink是一个基于Java/scala的项目,对于连接器和格式,实现都可以作为jar

pyflink中的flinkkafkconsumerer依赖于Java的flinkkafkconsumerer实现

需要下载kafka connector jar包到pyflink的lib目录下。一般情况下,lib目录的路径为:/usr/local/lib/python3.8.2/site-packages/pyflink/lib

class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
"""
The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Apache Kafka. The consumer can run in multiple parallel instances, each of which will
pull data from one or more Kafka partitions.
The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
during a failure, and that the computation processes elements 'exactly once. (These guarantees
naturally assume that Kafka itself does not lose any data.)
Please note that Flink snapshots the offsets internally as part of its distributed checkpoints.
The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in
sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of
how far the Flink Kafka consumer has consumed a topic.
Please refer to Kafka's documentation for the available configuration properties:
http://kafka.apache.org/documentation.html#newconsumerconfigs
"""
def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema,
properties: Dict):
"""
Creates a new Kafka streaming source consumer for Kafka 0.10.x.
This constructor allows passing multiple topics to the consumer.
:param topics: The Kafka topics to read from.
:param deserialization_schema: The de-/serializer used to convert between Kafka's byte
messages and Flink's objects.
:param properties: The properties that are used to configure both the fetcher and the offset
handler.
"""
JFlinkKafkaConsumer = get_gateway().jvm
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
JFlinkKafkaConsumer)
super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)

似乎Kafka连接器从Flink 1.12开始就停止了。查看文档。现在可以使用通用kafka连接器。阅读更多关于cloudera的信息。

In Flink 1.12 we removed the Kafka 0.10.x and 0.11.x connectors. Please use the universal Kafka connector which works with any Kafka cluster version after 0.10.2.x.

最新更新