confluent avro converter通过ID或topicName+version从模式注册表中检索模式.&l



我设置了一个kafka s3 sink连接器集群,它适用于模式名称为pattern -value的kafka主题。

工作任务可以成功地使用和反序列化来自这些主题的消息,并接收到parquet中的s3

但是,在我的环境中,并非所有主题都设计为将模式名设置为-value。一些主题共享公共模式。

其他组件依赖于消息元数据中的schemaId来从模式注册中心检索模式以进行反序列化。它们都可以很好地适用于所有主题。

我的问题是:

  • 检查源代码,我发现kafka连接器将基于模式和版本的反序列化。为什么版本很重要?如果schemaId可以在模式注册表

    中唯一标识模式
  • 似乎转换器依赖于三种命名策略而不是unique schemaId: TopicNameStrategy RecordNameStrategyTopicRecordNameStrategy是什么原因?我能想象的性能(例如topicnamestrategy),这样你就不必每条记录连接到模式注册中心,但是使用schemaId,您可以这样做通过本地缓存模式实现相同的效果

  • 什么是一个快速的解决方案,使转换器工作在我的电流上述三种策略似乎都不适合的环境用例。

主题名称模式:env.type.srcapp.data。如版本。testing.enterprise.appName.trade.v1

schema subject name that works: `testing.enterprise.appName.trade.v1-value`    
**schema subject name that does not work**: `testing.trade.schema_version`

提前感谢!

第二个问题的答案

TopicNameStrategy强制主题使用单个静态模式主题。它是{topic}-value或{topic}-key。在不同的策略可用之前,这是默认的向后兼容性目的,而不是性能。

@Override
public String subjectName(String topic, boolean isKey, ParsedSchema schema) {
return isKey ? topic + "-key" : topic + "-value";
}

RecordNameStrategy表示主题中的记录是"自我描述的"。换句话说,主题和模式以1:n的方式动态映射。

@Override
public String subjectName(String topic, boolean isKey, ParsedSchema schema) {
if (schema == null) {
return null;
}
return getRecordName(schema, isKey);
}
protected String getRecordName(ParsedSchema schema, boolean isKey) {
String name = schema.name();
if (name != null) {
return name;
}
// ... not important
}

但是RecordNameStrategy不能被一些融合平台组件使用。例如ksql。

我推荐confluent文档:https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work

如果所有模式都在本地文件中,则根本不需要连接schema-registry。

例如)

try (Producer<String, GenericRecord> producer = new KafkaProducer<>(properties)) {
var record = new GenericData.Record(schema) {{
put("name", "red brief");
put("price", 10000);
put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
}};
var producerRecord = new ProducerRecord<>(topic, record);
producer.send(producerRecord);
}
public String readSchema(String schemaName) {
File schemaFile = new File(SCHEMA_HOME + schemaName + ".avsc");
try (InputStream inputStream = new FileInputStream(schemaFile)) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}

SCHEMA_HOME/schemaName美元。

希望对你有帮助。

最新更新