我正在尝试用Flink的KafkaSource运行一个简单的测试程序。我正在使用以下内容:
- 闪烁0.9
- Scala 2.10.4
- 卡夫卡0.8.2.1
我按照文档测试了KafkaSource(添加了依赖项,将Kafka连接器flink连接器Kafka捆绑在插件中(,如下所述。
下面是我的简单测试程序:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka
object TestKafka {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
.print
}
}
然而,编译总是抱怨找不到KafkaSource:
[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR] .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
我在这里想念什么?
我是sbt用户,所以我使用了以下build.sbt
:
organization := "pl.japila.kafka"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"
让我可以运行程序:
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._
object TestKafka {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
.print
}
}
输出:
[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
问题似乎是SBT和Maven配置文件不能很好地配合使用。
Flink POM将Scala版本(2.10、2.11、…(称为一个变量,其中一些在构建概要文件中定义。SBT未正确评估配置文件,因此包装无法正常工作。
存在一个问题和待处理的拉取请求以解决此问题:https://issues.apache.org/jira/browse/FLINK-2408
object FlinkKafkaStreaming {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "flink-kafka")
val stream = env.addSource(new FlinkKafkaConsumer08[String]
("your_topic_name",new SimpleStringSchema(), properties))
stream.setParallelism(1).writeAsText("your_local_dir_path")
env.execute("XDFlinkKafkaStreaming")
}
}
为了进行测试,您可以执行以下操作:
- 首先运行flink演示
- 运行Kafka_Pouder