FlinkKafkaProducer扩展KafkaSerializationSchema的Scala字符串接收器



[Flink版本-1.9]

因此,我试图将一个字符串(JSON格式(转换为kafka主题,并且有点拘泥于如何实现KafkaSerializationSchema来转换字符串。SimpleStringSchema似乎不适用于FlinkKafkaProducer,因为它需要一个KafkaSerializationSchema。

如果已经有一些代码是像SimpleStringSchema这样的util for kafka,我希望它更好,但如果我必须自己写,有人能解释为什么我从java中的另一个stackoverflow帖子转换的scala代码基本上做了同样的事情,却没有覆盖任何东西吗?

def defineKafkaDataSink(topic: String,
kafkaBootstrapServer: String = "localhost:9092"):FlinkKafkaProducer[String] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
new FlinkKafkaProducer[String](topic,new ProducerStringSerializationSchema(topic),properties,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
}

// Not sure why serialize doesnt override anything
// working from a java stack overflow post
// https://stackoverflow.com/questions/58644549/how-to-implement-flinkkafkaproducer-serializer-for-kafka-2-2
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import java.nio.charset.StandardCharsets
class ProducerStringSerializationSchema(var topic: String) extends KafkaSerializationSchema[String] {
override def serialize(element: String, timestamp: Long) = new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8))
}

这实际上更像是一个scala问题,而不是一个棘手的问题。确保这些类型与相应的java类型完全对齐。scala习惯于提供外观非常相似的类型,这些类型与java类型不同。

override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]]

特别是仔细检查Long是否确实是javaLong而不是scalaLong。

如果将来遇到类似的问题,让IDE生成方法存根通常会更容易,因为它会"知道"正确的签名。所以,只要注释掉你的方法,让IDE抱怨你实际上有未实现的接口。然后您的IDE希望为您添加它们。

最新更新