FlinkKafkaProducer的重载构造函数



我正在尝试使用Scala和Flink将消息发布到Kafka主题中。然而,当使用文档中提供的代码创建FlinkKafkaProducer对象时,它会告诉我不能应用构造函数。这是代码示例:

val studentProducer = new FlinkKafkaProducer[String](
"my_topic",                  // target topic
new SimpleStringSchema(),    // serialization schema
properties,                  // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance

与以下进口:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util.Properties

这就是我得到的错误:

/home/user/Flink/flinkproj/src/main/scala/org/flink/Job.scala:83:27: overloaded method constructor FlinkKafkaProducer with alternatives:
[error]   (x$1: String,x$2: org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema[String],x$3: java.util.Properties,x$4: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic)org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String] <and>
[error]   (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedSerializationSchema[String],x$3: java.util.Properties,x$4: java.util.Optional[org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner[String]])org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String] <and>
[error]   (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedSerializationSchema[String],x$3: java.util.Properties,x$4: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic)org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String] <and>
[error]   (x$1: String,x$2: org.apache.flink.api.common.serialization.SerializationSchema[String],x$3: java.util.Properties,x$4: java.util.Optional[org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner[String]])org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer[String]
[error]  cannot be applied to (String, org.apache.flink.streaming.util.serialization.SimpleStringSchema, java.util.Properties, org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic)
[error]     val studentProducer = new FlinkKafkaProducer[String](

变量propertiesjava.util.Properties的一个实例。我认为它必须在字符串序列化程序中,但我看不出有什么问题。

版本的详细信息如下,来自build.sbt:

ThisBuild / scalaVersion := "2.11.8"
val flinkVersion = "1.11.3"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "provided")

我认为文档已经过期,您需要提供KafkaSerializationSchemaKeyedSerializationSchema,或者如果您使用SerializationSchema,则还需要提供FlinkKafkaPartitioner

我在Scala中没有一个例子,但这里有一个Java例子,展示了如何实现使用ObjectMapper编写JSON:的KafkaSerializationSchema

/**
* A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
*
*/
public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;
public ClickEventStatisticsSerializationSchema(){
}
public ClickEventStatisticsSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
final ClickEventStatistics message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
}
}

最新更新