我如何获得kafka-json-schema-serializer在一个供应商函数使用杰克逊而不是二进制对象?<



使用Spring Boot Cloud Streams Kafka Binder通过Schema Registry通过Supplier/Consumer关系生成和消费JSON,导致它被编码为二进制对象。

这段代码

@Bean
public Supplier<PaymentEvent> produceEvents() {
return () -> {
PaymentEvent paymentEvent = new PaymentEvent();
paymentEvent.name = "xxxxx";
log.info("paymentEvent.name: " + paymentEvent.name);
return paymentEvent;
};
}

在使用

spring:
cloud:
stream:
bindings:
produceEvents-out-0:
destination: paymentevent-connectid
binder: kafka
group: paymentevent-group
kafka:
bindings:
produceEvents-out-0:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
schema.registry.url: http://confluent-schema-registry:8082
binder:
brokers: eventcluster-kafka-bootstrap:9093
configuration:
security.protocol: SSL
ssl.truststore.location: classpath:/kafka/tls/eventcluster-cluster-ca-cert-ca.p12
ssl.truststore.password: xxxxx
ssl.truststore.type: PKCS12
ssl.keystore.location: classpath:/kafka/tls/paymentevent.p12
ssl.keystore.password: xxxx
ssl.keystore.type: PKCS12

生成以下JSON Schema

i.c.k.s.client.rest.RestService          Sending POST with input {"schemaType":"JSON","schema":"{"$schema":"http://json-schema.org/draft-07/schema#","title":"Byte []","type":"array","items":{"type":"integer"}}"} to http://confluent-schema-registry:8082/subjects/paymentevent-connectid-value/versions?normalize=false

如果我直接编码

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "eventcluster-kafka-bootstrap:9093");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location",
"kafka\tls\eventcluster-cluster-ca-cert-ca.p12");
props.put("ssl.trustxxxxstore.password", "x");
props.put("ssl.truststore.type", "PKCS12");
props.put("ssl.keystore.location",
"kafka\tls\paymentevent.p12");
props.put("ssl.keystore.password", "xxxx");
props.put("ssl.keystore.type", "PKCS12");
props.put("schema.registry.url", "http://127.0.0.1:8082");
Producer<String, PaymentEvent> producer = new KafkaProducer<String, PaymentEvent>(props);
String topic = "paymentevent-connectid";
String key = "1";
PaymentEvent paymentEvent = new PaymentEvent();
paymentEvent.name = "xxxx";
ProducerRecord<String, PaymentEvent> record = new ProducerRecord<String, PaymentEvent>(topic, key,
paymentEvent);
producer.send(record).get();
producer.close();

变成

io.confluent.kafka.schemaregistry.client.rest.RestService - Sending POST with input {"schemaType":"JSON","schema":"{"$schema":"http://json-schema.org/draft-07/schema#","title":"Payment Event","type":"object","additionalProperties":false,"properties":{"name":{"oneOf":[{"type":"null","title":"Not included"},{"type":"string"}]},"signedreqattr.claims":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/SignedreqattrClaims"}]}},"definitions":{"SignedreqattrClaims":{"type":"object","additionalProperties":false,"properties":{"id_token":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/IdToken"}]}}},"IdToken":{"type":"object","additionalProperties":false,"properties":{"birthdate":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/Birthdate"}]},"address":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/Address"}]},"name":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/Name"}]},"phone_number":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/PhoneNumber"}]},"given_name":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/GivenName"}]},"family_name":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/FamilyName"}]},"email":{"oneOf":[{"type":"null","title":"Not included"},{"$ref":"#/definitions/Email"}]}}},"Birthdate":{"type":"object","additionalProperties":false,"properties":{"essential":{"oneOf":[{"type":"null","title":"Not included"},{"type":"boolean"}]}}},"Address":{"type":"object","additionalProperties":false,"properties":{"essential":{"oneOf":[{"type":"null","title":"Not included"},{"type":"boolean"}]}}},"Name":{"type":"object","additionalProperties":false,"properties":{"essential":{"oneOf":[{"type":"null","title":"Not included"},{"type":"boolean"}]}}},"PhoneNumber":{"type":"object","additionalProperties":false,"properties":{"essential":{"oneOf":[{"type":"null","title":"Not included"},{"type":"boolean"}]}}},"GivenName":{"type":"object","additionalProperties":false,"properties":{"essential":{"oneOf":[{"type":"null","title":"Not included"},{"type":"boolean"}]}}},"FamilyName":{"type":"object","additionalProperties":false,"properties":{"essential":{"oneOf":[{"type":"null","title":"Not included"},{"type":"boolean"}]}}},"Email":{"type":"object","additionalProperties":false,"properties":{"essential":{"oneOf":[{"type":"null","title":"Not included"},{"type":"boolean"}]}}}}}"} to http://127.0.0.1:8082/subjects/paymentevent-connectid-value/versions?normalize=false
1

似乎使用Supplier将其转换为二进制并引起消费者的问题。

有什么想法我做错了吗?

<代码>

发现我需要添加

PP_7

最新更新