我尝试从Kafka到Flink获取数据,我使用flinkkafkconsumerer,但Intellij告诉我它是被弃用的,而且谷歌云中的ssh控制台也显示了这个错误:object connectors is not a member of package org.apache.flink.streaming
。我怎样重写它才能正常工作呢?
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
val topic = "test"
val server = "localhost:9092"
val properties = new Properties()
properties.setProperty("bootstrap.servers", server)
properties.setProperty("group.id", "test")
val inputStream = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
val crimes = inputStream.filter(s => !s.startsWith("ID")).map(_.split(","))
.map(array => CrimeEvent(
date=array(1),
category=IUCRToCategory.getOrElse(array(2), "unrecognized"),
district=array(5),
ifArrest=BooleanToInt(array(3) == "True"),
domestic=BooleanToInt(array(4) == "True"),
ifFBI=IUCRToFBI.getOrElse(array(2), 0)
))
Flink的FlinkKafkaConsumer
确实已被弃用并被KafkaSource
取代。您可以在https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html
flinkkafkconsumerer已弃用,您可以使用KafkaSource。下面是scala
中的一段代码val sourcea = KafkaSource.builder[String]
.setBootstrapServers("localhost:9092")
.setTopics("topic")
.setGroupId("grp")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build