如何在Apache Flink 1.14中为KafkaSink添加message key ?



如标题所述,我需要在KafkaSink中设置自定义message key。我找不到任何关于如何在Apache Flink 1.14文档中实现这一目标的指示。目前,我正确地设置了KafkaSink,data payload正确地写在topic中,但keynull。有什么建议吗?提前感谢

您应该实现一个KafkaRecordSerializationSchema,它将密钥设置在serialize方法返回的ProducerRecord上。

您将或多或少像这样创建sink:

KafkaSink<UsageRecord> sink =
KafkaSink.<UsageRecord>builder()
.setBootstrapServers(brokers)
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(new MyRecordSerializationSchema(topic))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-record-producer")
.build();

和序列化器将是这样的:

public class MyRecordSerializationSchema implements
KafkaRecordSerializationSchema<T> {
private static final long serialVersionUID = 1L;
private String topic;
private static final ObjectMapper objectMapper =
JsonMapper.builder()
.build()
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
public MyRecordSerializationSchema() {}
public MyRecordSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
T element, KafkaSinkContext context, Long timestamp) {
try {
return new ProducerRecord<>(
topic,
null, // choosing not to specify the partition
element.ts.toEpochMilli(),
element.getKey(),
objectMapper.writeValueAsBytes(element));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
"Could not serialize record: " + element, e);
}
}
}

注意,这个例子也设置了时间戳。

哎呀,这个例子来自https://github.com/alpinegizmo/flink-mobile-data-usage/blob/main/src/main/java/com/ververica/flink/example/datausage/records/UsageRecordSerializationSchema.java。

这个例子是给scala程序员的。这里,我们通过为每个事件生成UUID来定义键。

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import java.lang
class MyRecordSerializationSchema extends KafkaRecordSerializationSchema[String] {

override def serialize(element: String, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
return new ProducerRecord(
kafkaTopicName,
java.util.UUID.randomUUID.toString.getBytes,
element.getBytes
)
}
}

在主类中,当定义kafka sink时,必须传递这个类的实例,像这样:

val sinkKafka: KafkaSink[String] = KafkaSink.builder()
.setBootstrapServers(bootstrapServerUrl) //Bootstrap server url
.setRecordSerializer(new MyRecordSerializationSchema())
.build()

相关内容

  • 没有找到相关文章

最新更新