如何使用 Kafka Connect AVRO 的模式注册表



我最近开始探索Kafka和Kafka连接,并做了一些初步设置。 但想在模式注册表部分探索更多内容。

我的模式注册表现在启动了,我应该怎么做.

我有一个存储在 avro_schema.avsc 中的 AVRO 架构。

这是架构

{
"name": "FSP-AUDIT-EVENT",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "ID",
"type": "string"
},
{
"name": "VERSION",
"type": "int"
},
{
"name": "ACTION_TYPE",
"type": "string"
},
{
"name": "EVENT_TYPE",
"type": "string"
},
{
"name": "CLIENT_ID",
"type": "string"
},
{
"name": "DETAILS",
"type": "string"
},
{
"name": "OBJECT_TYPE",
"type": "string"
},
{
"name": "UTC_DATE_TIME",
"type": "long"
},
{
"name": "POINT_IN_TIME_PRECISION",
"type": "string"
},
{
"name": "TIME_ZONE",
"type": "string"
},
{
"name": "TIMELINE_PRECISION",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_UTC_DT",
"type": [
"string",
"null"
]
},
{
"name": "AUDIT_EVENT_TO_DATE_PITP",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_DATE_TZ",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_DATE_TP",
"type": "string"
},
{
"name": "GROUP_ID",
"type": "string"
},
{
"name": "OBJECT_DISPLAY_NAME",
"type": "string"
},
{
"name": "OBJECT_ID",
"type": [
"string",
"null"
]
},
{
"name": "USER_DISPLAY_NAME",
"type": [
"string",
"null"
]
},
{
"name": "USER_ID",
"type": "string"
},
{
"name": "PARENT_EVENT_ID",
"type": [
"string",
"null"
]
},
{
"name": "NOTES",
"type": [
"string",
"null"
]
},
{
"name": "SUMMARY",
"type": [
"string",
"null"
]
}
]
}

我的架构是否有效。我从 JSON 在线转换了它? 我应该在哪里保存这个架构文件位置,我不确定. 请指导我遵循的步骤 . 我正在从 Lambda 函数和 JDBC 源发送记录。

那么基本上我该如何强制执行AVRO模式并进行测试? 我必须更改 avro-消费者属性文件中的任何内容吗?

或者这是注册架构的正确方法

./bin/kafka-avro-console-producer 
--broker-list b-3.**:9092,b-**:9092,b-**:9092 --topic AVRO-AUDIT_EVENT 
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'


curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"     --data '{"schema" : "{"type":"struct","fields":[{"type":"string","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"VERSION"},{"type":"string","optional":true,"field":"ACTION_TYPE"},{"type":"string","optional":true,"field":"EVENT_TYPE"},{"type":"string","optional":true,"field":"CLIENT_ID"},{"type":"string","optional":true,"field":"DETAILS"},{"type":"string","optional":true,"field":"OBJECT_TYPE"},{"type":"string","optional":true,"field":"UTC_DATE_TIME"},{"type":"string","optional":true,"field":"POINT_IN_TIME_PRECISION"},{"type":"string","optional":true,"field":"TIME_ZONE"},{"type":"string","optional":true,"field":"TIMELINE_PRECISION"},{"type":"string","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"OBJECT_DISPLAY_NAME"},{"type":"string","optional":true,"field":"OBJECT_ID"},{"type":"string","optional":true,"field":"USER_DISPLAY_NAME"},{"type":"string","optional":true,"field":"USER_ID"},{"type":"string","optional":true,"field":"PARENT_EVENT_ID"},{"type":"string","optional":true,"field":"NOTES"},{"type":"string","optional":true,"field":"SUMMARY"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_UTC_DT"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_DATE_PITP"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_DATE_TZ"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_DATE_TP"}],"optional":false,"name":"test"}"}' http://localhost:8081/subjects/view/versions

接下来我要做什么

但是当我尝试查看我的架构时,我只得到下面

curl --silent -X GET http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/latest

这是结果

{"subject":"AVRO-AUDIT-EVENT","version":1,"id":161,"schema":"{"type":"string","optional":false}"}

为什么我看不到完整注册的架构

另外,当我尝试删除架构时

我得到下面的错误

{"error_code":405,"message":"HTTP 405 Method Not Allowed"

我不确定我的架构是否正确注册。

请帮助我。 提前致谢

我的架构是否有效

您可以使用注册表的 REST API 尝试提交它并查看...

应该在哪里保存这个架构文件位置,我不确定

目前尚不清楚您是如何发送消息的...

如果你真的写了Kafka生产者代码,你可以把它存储在你的代码中(作为一个字符串(或一个资源文件。如果使用 Java,则可以改用 SchemaBuilder 类来创建 Schema 对象

您需要重写生产者以使用 Avro 架构和序列化程序(如果尚未编写(

如果我们创建AVRO模式,它也适用于Json。

Avro是一种二进制格式,但有一个JSONDecoder。

我们的 AVRO 架构属性文件的 URL 应该是什么?

一旦你弄清楚如何启动它,它必须是你的架构注册表的IP。(带schema-registry-start(

我必须更改 avro-消费者属性文件中的任何内容吗?

您需要使用阿夫罗反序列化器

这是注册架构的正确方法

.>/bin/kafka-avro-console-producer

差一点。这就是生成具有架构的消息的方式(并且您需要使用正确的架构(。您还必须提供--property schema.registry.url

使用注册表的 REST API 注册和验证架构

相关内容

  • 没有找到相关文章

最新更新