我正在尝试运行官方的"Kafka010Example.scala",但不幸的是,它没有按预期从输入主题读取并写入输出。我错过了什么或做错了什么?任何帮助或提示非常感谢。这正是我所做的:
-
在 docker 容器中启动了 kafka(spotify/kafka:latest)
$ docker run -d -p 2181:2181 -p 9092:9092 spotify/kafka:latest
-
在容器内启动了 bash 会话:
$ docker exec -it 26d1cfced4cb /bin/bash
-
创建的输入和输出主题:
$ /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input
$ /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-output
-
推出"Kafka010Example.scala",在本地模式(MiniCluster)下运行
flink 1.3.2
,flink-connector-kafka-0.10_2.11
具有以下参数:
--input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
-
原木:https://file.io/jWsqI8
-
向主题发送了一些消息:
$ /opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-input
blah
blahh
blahhh
-
在输出主题上检查偏移量 - 无
$ /opt/kafka_2.11-0.10.1.0/bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group myconsumer --topic test-output
Group Topic Pid Offset logSize Lag Owner myconsumer test-output 0 0 0 0 none
-
检查使用者组偏移量 - 无
$ /opt/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group myconsumer
No topic available for consumer group provided GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
我建议使用控制台使用者来观看测试输出主题。我不希望测试输出主题的偏移量已经推进,因为没有人从中读取过。
步骤 7 应该是:
$ /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --group myconsumer --topic test-output