auto.register.schemas=false
无法按我的预期工作。
如果我阅读了文档,那么我想让生产者重新制定新的模式。
https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-模式注册
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-预注册模式和客户端派生模式之间的差异
schema-registry.properties
# listeners = PLAINTEXT://your.host.name:9092
listeners=http://0.0.0.0:8081
# Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.
kafkastore.bootstrap.servers=192.168.16.192:9092,192.168.16.191:9092
# The name of the topic to store schemas in
kafkastore.topic=_schemas
# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
auto.register.schemas=false
use.latest.version=true
为了确保以前的一些设置不会持久化,我每次尝试都删除了_schemas主题,并更改了名称。
但是,每次我发出一些卡夫卡信息时,我都会看到一个名叫<主题名称>-值被注册。
我不明白为什么。
唯一有效的方法是当我添加:
props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
然后我有错误检索Avro架构。
但是,如果auto.register.schemas属性设置为false,我也应该有这个错误。对吗?
我的生产者代码是:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-address>");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");
props.put("schema.registry.url", "<schema-registry>");
#props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
KafkaProducer<String, ClientOrderRequest> producerRequest = new KafkaProducer<>(props);
ClientOrderRequest clientOrderRequest = createClientOrderRequest();
final ProducerRecord<String, ClientOrderRequest> producerOrderRequest = new ProducerRecord<>("client-order-request",
"ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);
producerRequest.send(producerOrderRequest);
我的错误是:
auto.register.schemas=false
use.latest.version=true
是kafka客户端属性,而不是架构注册表属性。这些需要设置在制片人一侧。