这个问题是我上一个问题的后续问题,在这个问题中,我询问了使用自定义Avro
Serdes对Kafka流进行序列化的问题。现在我在尝试配置JSON
Serde时遇到了一个不同的问题。我有一个kafka流拓扑,其中我使用了groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
。
@Service
@Slf4j
@EnableBinding(PosListenerAvroJsonBinding.class)
public class NotificationAvroJsonProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-avro-channel")
@SendTo("notification-output-json-channel")
public KStream<String, Notification> process(KStream<String, PosInvoiceAvro> input) {
/* with reduce transformation and serialization with KTable */
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
// ***********************************************
// THIS DOES NOT WORK WITH JSON, only works with AVRO.
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
// ***********************************************
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
})
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));
return notificationJsonKStream;
}
}
我基于这个网页定义了自定义序列化程序。我想我必须使用com.fasterxml.jackson.databind.JsonNode
,但它不起作用。我还测试了其他被评论的选项,它们也不起作用。
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JsonNode")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.github.felipegutierrez.explore.spring.model.Notification")
// , new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, "com.fasterxml.jackson.databind.JavaType")
, new AbstractMap.SimpleEntry<>(TYPE_PROPERTY, TYPE_PROPERTY_DEFAULT)
// , new AbstractMap.SimpleEntry<>("json.value.type", "org.springframework.kafka.support.serializer.JsonSerializer")
)
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<Notification>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
在网页上还说要定义java类的type.property=javaType, the JSON schema could specify "javaType":"org.acme.MyRecord" at the top level
。
@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSchemaInject(strings = {@JsonSchemaString(path = "javaType", value = "com.github.felipegutierrez.explore.spring.model.Notification")})
public class Notification {
@JsonProperty("InvoiceNumber")
private String InvoiceNumber;
@JsonProperty("CustomerCardNo")
private String CustomerCardNo;
@JsonProperty("TotalAmount")
private Double TotalAmount;
@JsonProperty("EarnedLoyaltyPoints")
private Double EarnedLoyaltyPoints;
@JsonProperty("TotalLoyaltyPoints")
private Double TotalLoyaltyPoints = 0.0;
}
当我为Spring+Kafka使用默认的JSON序列化程序时,我只在application.yaml
上设置了spring.json.add.type.headers: false
,它就工作了。但我在Confluent序列化程序上找不到这样的属性。
最后,错误如下。我认为应该在notificationSerde.configure(serdeConfig, false);
中使用正确的参数,因为当我在那里更改json序列化程序时,我会看到应用程序试图转换到不同的类。但我不知道我必须在那里放置哪种配置。
由java.lang.ClassCastException引起:classcom.fasterxml.jackson.databind.node.ObjectNode不能强制转换为类com.github.felipeguterrezexplore.spring.model.Notification(com.fasterxml.jackson.databind.node.ObjectNode和com.github.felipeguterrezexplore.spring.model.Notification位于加载程序"app"的未命名模块(
我使用Serde的此配置进行了修复。
@Service
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
, new AbstractMap.SimpleEntry<>(FAIL_INVALID_SCHEMA, "true")
, new AbstractMap.SimpleEntry<>(JSON_VALUE_TYPE, Notification.class.getName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static Serde<Notification> Notification() {
final Serde<Notification> notificationSerde = new KafkaJsonSchemaSerde<>();
notificationSerde.configure(serdeConfig, false);
return notificationSerde;
}
}
并将CCD_ 9添加到减速器中。
KStream<String, Notification> notificationJsonKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationJson(v)))
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.Notification()))
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
Named.as("notification-reducer"),
Materialized.with(CustomSerdes.String(), CustomSerdes.Notification()))
.toStream();
notificationJsonKStream.foreach((k, v) -> log.info(String.format("Notification JSON agg - key: %s, value: %s", k, v)));