object FlinkKafkaConsumer010 不是包 org.apache.flink.streaming.



我正在尝试组装一个小程序来使用 apache flink 连接到 kafka 主题。我需要使用 FlinkKafkaConsumer010。

package uimp
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010}
import java.util.Properties
object Silocompro {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val propertiesTopicDemographic = new Properties()
propertiesTopicDemographic.setProperty("bootstrap.servers", "bigdata.dataspartan.com:19093")
propertiesTopicDemographic.setProperty("group.id", "demographic")
val myConsumerDemographic = new FlinkKafkaConsumer010[String]("topic_demographic", new 
SimpleStringSchema(), propertiesTopicDemographic)
val messageStreamDemographic = env
.addSource(myConsumerDemographic)
.print()

env.execute("Flink Scala API Skeleton")
}
}

我的问题是,当尝试使用此build.sbt组装我的程序时,编译器返回错误"对象FlinkKafkaConsumer010不是包org.apache.flink.streaming.connectors.kafka的成员":

ThisBuild / resolvers ++= Seq("Apache Development Snapshot Repository" at 
"https://repository.apache.org/content/repositories/snapshots/",Resolver.mavenLocal)
name := "silocompro"
version := "1.0"
organization := "uimp"
ThisBuild / scalaVersion := "2.12.11"
val flinkVersion = "1.9.0"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-core"% flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-base" % flinkVersion % "provided",
"org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "provided")
lazy val root = (project in file(".")).
settings( libraryDependencies ++= flinkDependencies)

assembly / mainClass := Some("uimp.Silocompro")
Compile / run  := Defaults.runTask(Compile / fullClasspath,
Compile / run / mainClass,
Compile / run / runner
).evaluated

Compile / run / fork := true
Global / cancelable := true
assembly / assemblyOption  := (assembly / assemblyOption).value.copy(includeScala = false)

此依赖项错误的原因是什么?

连接器不是 flink-binary 的一部分,这意味着您需要在compile范围内拥有连接器,因此这基本上意味着您需要从这些依赖项中删除provided。在此设置中,应用将在群集上运行。

但是,如果您想在不启动集群的情况下在本地运行它,那么您应该将所有 flink 依赖项都放在compile范围内,即删除所有provided范围声明。

最后我遇到了依赖问题。我做了一些操作:

  1. 我添加了一个新的解析程序 https:/oss.sonatype.org/content/repositories
  2. 我已卸载 插件金属(斯卡拉(来自VS代码
  3. 我添加了"org.apache.flink"%% "flink-connector-kafka-0.10"%flinkVersion - tomy flinkDependencies

在此操作之后,我解决了我的库依赖项问题。谢谢

最新更新