避免Spring Kafka设置的__TypeId__和spring_json_header_types魔法头



我正在尝试配置Spring Kafka,以便通过KafkaTemplate发送的事件不包括magic__TypeId__头。从我在文档和网上读到的,JsonSerializer.ADD_TYPE_INFO_HEADERS应该给我想要的结果。但我不使用春天的JsonSerializer,相反,我使用org.apache.kafka.common.serialization.ByteArraySerializerByteArrayJsonMessageConverter相结合。我很困惑,在我的情况下,header在哪里设置,如何禁用它?

下面是我的代码配置:

@TestConfiguration
public static class TestKafkaProducerConfig {
@Bean
public KafkaTemplate<String, MyEvent> kafkaTemplate(
@Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {
var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
template.setDefaultTopic(topicName);
var headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setMapAllStringsOut(true);
var messageConverter = new ByteArrayJsonMessageConverter();
messageConverter.setHeaderMapper(headerMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
}
public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
}

如果我使用JsonSerializer,那么JsonSerializer.ADD_TYPE_INFO_HEADERS设置受到尊重,并且我没有得到__TypeId__头集。但是,然后我不能使用我的自定义MessageConverterSimpleKafkaHeaderMappermapAllStringsOut,这阻止了另一个神奇的头spring_json_header_types

我的总体目标是避免Spring Kafka设置任何魔法头,如__TypeId__spring_json_header_types

这些头确实是从该转换器填充的。它委托给typeMapper:

this.typeMapper.fromClass(message.getPayload().getClass(), headers);

其中DefaultJackson2JavaTypeMapper在上述__TypeId__标头周围添加了逻辑:

public void fromJavaType(JavaType javaType, Headers headers) {
String classIdFieldName = getClassIdFieldName();
if (headers.lastHeader(classIdFieldName) != null) {
removeHeaders(headers);
}
addHeader(headers, classIdFieldName, javaType.getRawClass());

默认getClassIdFieldName()AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME

所以,在ByteArraySerializerByteArrayJsonMessageConverter的当前组合中,没有办法阻止标题出现在ProducerRecord中。除非你注入一个自定义的Jackson2JavaTypeMapper,用一个空的主体覆盖fromJavaType()

你真的可以用DefaultKafkaHeaderMapper和它的setMapAllStringsOut(true)来避免它,如果你要发送的所有头都是String或已经是byte[],就不会有spring_json_header_types额外的头。

查看DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns),只提供那些你真正需要发送的头。

正如@artem-bilan所解释的那样,在使用ByteArraySerializerByteArrayJsonMessageConverter时,要摆脱__TypeId__并不容易。所以,我用JsonSerializerMessagingMessageConverter代替。下面是我的配置,它产生Kafka事件没有任何魔法头。

@TestConfiguration
public static class TestKafkaProducerConfig {
@Bean
public KafkaTemplate<String, MyEvent> kafkaTemplate(
@Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {
var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
template.setDefaultTopic(topicName);
// Use SimpleKafkaHeaderMapper which does not add the json types header, `spring_json_header_types`.
// The mapAllStringsOut when set to true, all string-valued headers will be converted to byte[] using
// the charset property (default UTF-8).
var headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setMapAllStringsOut(true);
var messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(headerMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
}
public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
}

相关内容

  • 没有找到相关文章

最新更新