如何使Spring Kafka JsonDeserializer在反序列化为OffsetDateTime时保留时区偏移量



我通过Kafka收到了一条消息,我知道其中有一个非UTC时区。当我使用org.apache.kafka.common.serialization.StringDeserializer来验证这一点时,我得到了ISO 8601格式的正确时间戳和时区:

{  "id": "e499f2e8-a50e-4ff8-a9fe-0eaf9d3314bf", "sent_ts": "2021-02-04T14:06:10+01:00" }

当我切换到org.springframework.kafka.support.serializer.JsonDeserializer时,这将丢失。我的POJO是这样的:

public class MyMessage {
@JsonProperty("id")
private String id;
@JsonProperty("sent_ts")
private OffsetDateTime sentTs;
@Override
public String toString() {
return "MyMessage{" +
"id='" + id + ''' +
", sentTs=" + sentTs +
'}';
}

当我记录我收到的消息时,我得到:

MyMessage{id='e499f2e8-a50e-4ff8-a9fe-0eaf9d3314bf', sentTs=2021-02-04T13:06:10Z}

我认为JsonDeserializer必须使用Jackson,所以在我的application.yml配置中,我设置了:

spring.jackson:
deserialization.ADJUST_DATES_TO_CONTEXT_TIME_ZONE: false

这没用。我还尝试了一个定制程序:

@Configuration
public class ObjectMapperBuilderCustomizer implements Jackson2ObjectMapperBuilderCustomizer {
@Override
public void customize(Jackson2ObjectMapperBuilder builder) {
builder.modules(new JavaTimeModule());
builder.featuresToDisable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
}
}

这也没用。

我认为它可能需要成为卡夫卡消费者的财产,所以我也尝试了:

spring:
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.jackson.deserialization.ADJUST_DATES_TO_CONTEXT_TIME_ZONE: false

仍然不起作用。

有没有办法让JsonDeserializer正常工作并保持正确的时区偏移?

当您喜欢这个value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer时,该类的实例是由Apache Kafka客户端代码创建的,该客户端代码完全不知道Spring配置。

如果你想依赖Spring Boot配置的ObjectMapper和你的定制,你应该考虑这样做:

@Bean
DefaultKafkaConsumerFactory kafkaConsumerFactory(KafkaProperties properties, ObjectMapper objectMapper) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
jsonDeserializer.configure(consumerProperties, false);
return new DefaultKafkaConsumerFactory(consumerProperties, 
new StringDeserializer(), jsonDeserializer);
}

注意我怎么称呼jsonDeserializer.configure(consumerProperties, false);。这样,您仍然可以在applicaiton.yml中为Kafka使用者配置其余属性。

请考虑为Spring Boot提出GH问题,因此我们将修改如何处理JsonDeserializer和自动配置的ObjectMapper,以提供更好的最终用户体验。

相关内容

最新更新