我正在尝试配置Spring Kafka,以便通过KafkaTemplate
发送的事件不包括magic__TypeId__
头。从我在文档和网上读到的,JsonSerializer.ADD_TYPE_INFO_HEADERS
应该给我想要的结果。但我不使用春天的JsonSerializer
,相反,我使用org.apache.kafka.common.serialization.ByteArraySerializer
与ByteArrayJsonMessageConverter
相结合。我很困惑,在我的情况下,header在哪里设置,如何禁用它?
下面是我的代码配置:
@TestConfiguration
public static class TestKafkaProducerConfig {
@Bean
public KafkaTemplate<String, MyEvent> kafkaTemplate(
@Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {
var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
template.setDefaultTopic(topicName);
var headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setMapAllStringsOut(true);
var messageConverter = new ByteArrayJsonMessageConverter();
messageConverter.setHeaderMapper(headerMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
}
public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
}
如果我使用JsonSerializer
,那么JsonSerializer.ADD_TYPE_INFO_HEADERS
设置受到尊重,并且我没有得到__TypeId__
头集。但是,然后我不能使用我的自定义MessageConverter
与SimpleKafkaHeaderMapper
和mapAllStringsOut
,这阻止了另一个神奇的头spring_json_header_types
。
我的总体目标是避免Spring Kafka设置任何魔法头,如__TypeId__
和spring_json_header_types
。
这些头确实是从该转换器填充的。它委托给typeMapper
:
this.typeMapper.fromClass(message.getPayload().getClass(), headers);
其中DefaultJackson2JavaTypeMapper
在上述__TypeId__
标头周围添加了逻辑:
public void fromJavaType(JavaType javaType, Headers headers) {
String classIdFieldName = getClassIdFieldName();
if (headers.lastHeader(classIdFieldName) != null) {
removeHeaders(headers);
}
addHeader(headers, classIdFieldName, javaType.getRawClass());
默认getClassIdFieldName()
为AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME
。
所以,在ByteArraySerializer
和ByteArrayJsonMessageConverter
的当前组合中,没有办法阻止标题出现在ProducerRecord
中。除非你注入一个自定义的Jackson2JavaTypeMapper
,用一个空的主体覆盖fromJavaType()
。
你真的可以用DefaultKafkaHeaderMapper
和它的setMapAllStringsOut(true)
来避免它,如果你要发送的所有头都是String
或已经是byte[]
,就不会有spring_json_header_types
额外的头。
查看DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns)
,只提供那些你真正需要发送的头。
正如@artem-bilan所解释的那样,在使用ByteArraySerializer
和ByteArrayJsonMessageConverter
时,要摆脱__TypeId__
并不容易。所以,我用JsonSerializer
和MessagingMessageConverter
代替。下面是我的配置,它产生Kafka事件没有任何魔法头。
@TestConfiguration
public static class TestKafkaProducerConfig {
@Bean
public KafkaTemplate<String, MyEvent> kafkaTemplate(
@Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {
var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
template.setDefaultTopic(topicName);
// Use SimpleKafkaHeaderMapper which does not add the json types header, `spring_json_header_types`.
// The mapAllStringsOut when set to true, all string-valued headers will be converted to byte[] using
// the charset property (default UTF-8).
var headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setMapAllStringsOut(true);
var messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(headerMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
}
public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
}