如标题所述,我需要在KafkaSink
中设置自定义message key
。我找不到任何关于如何在Apache Flink 1.14
文档中实现这一目标的指示。目前,我正确地设置了KafkaSink
,data payload
正确地写在topic
中,但key
是null
。有什么建议吗?提前感谢
您应该实现一个KafkaRecordSerializationSchema
,它将密钥设置在serialize
方法返回的ProducerRecord
上。
您将或多或少像这样创建sink:
KafkaSink<UsageRecord> sink =
KafkaSink.<UsageRecord>builder()
.setBootstrapServers(brokers)
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(new MyRecordSerializationSchema(topic))
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-record-producer")
.build();
和序列化器将是这样的:
public class MyRecordSerializationSchema implements
KafkaRecordSerializationSchema<T> {
private static final long serialVersionUID = 1L;
private String topic;
private static final ObjectMapper objectMapper =
JsonMapper.builder()
.build()
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
public MyRecordSerializationSchema() {}
public MyRecordSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
T element, KafkaSinkContext context, Long timestamp) {
try {
return new ProducerRecord<>(
topic,
null, // choosing not to specify the partition
element.ts.toEpochMilli(),
element.getKey(),
objectMapper.writeValueAsBytes(element));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
"Could not serialize record: " + element, e);
}
}
}
注意,这个例子也设置了时间戳。
哎呀,这个例子来自https://github.com/alpinegizmo/flink-mobile-data-usage/blob/main/src/main/java/com/ververica/flink/example/datausage/records/UsageRecordSerializationSchema.java。
这个例子是给scala程序员的。这里,我们通过为每个事件生成UUID来定义键。
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import java.lang
class MyRecordSerializationSchema extends KafkaRecordSerializationSchema[String] {
override def serialize(element: String, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
return new ProducerRecord(
kafkaTopicName,
java.util.UUID.randomUUID.toString.getBytes,
element.getBytes
)
}
}
在主类中,当定义kafka sink时,必须传递这个类的实例,像这样:
val sinkKafka: KafkaSink[String] = KafkaSink.builder()
.setBootstrapServers(bootstrapServerUrl) //Bootstrap server url
.setRecordSerializer(new MyRecordSerializationSchema())
.build()