我最近开始探索Kafka和Kafka连接,并做了一些初步设置。 但想在模式注册表部分探索更多内容。
我的模式注册表现在启动了,我应该怎么做.
我有一个存储在 avro_schema.avsc 中的 AVRO 架构。
这是架构
{
"name": "FSP-AUDIT-EVENT",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "ID",
"type": "string"
},
{
"name": "VERSION",
"type": "int"
},
{
"name": "ACTION_TYPE",
"type": "string"
},
{
"name": "EVENT_TYPE",
"type": "string"
},
{
"name": "CLIENT_ID",
"type": "string"
},
{
"name": "DETAILS",
"type": "string"
},
{
"name": "OBJECT_TYPE",
"type": "string"
},
{
"name": "UTC_DATE_TIME",
"type": "long"
},
{
"name": "POINT_IN_TIME_PRECISION",
"type": "string"
},
{
"name": "TIME_ZONE",
"type": "string"
},
{
"name": "TIMELINE_PRECISION",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_UTC_DT",
"type": [
"string",
"null"
]
},
{
"name": "AUDIT_EVENT_TO_DATE_PITP",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_DATE_TZ",
"type": "string"
},
{
"name": "AUDIT_EVENT_TO_DATE_TP",
"type": "string"
},
{
"name": "GROUP_ID",
"type": "string"
},
{
"name": "OBJECT_DISPLAY_NAME",
"type": "string"
},
{
"name": "OBJECT_ID",
"type": [
"string",
"null"
]
},
{
"name": "USER_DISPLAY_NAME",
"type": [
"string",
"null"
]
},
{
"name": "USER_ID",
"type": "string"
},
{
"name": "PARENT_EVENT_ID",
"type": [
"string",
"null"
]
},
{
"name": "NOTES",
"type": [
"string",
"null"
]
},
{
"name": "SUMMARY",
"type": [
"string",
"null"
]
}
]
}
我的架构是否有效。我从 JSON 在线转换了它? 我应该在哪里保存这个架构文件位置,我不确定. 请指导我遵循的步骤 . 我正在从 Lambda 函数和 JDBC 源发送记录。
那么基本上我该如何强制执行AVRO模式并进行测试? 我必须更改 avro-消费者属性文件中的任何内容吗?
或者这是注册架构的正确方法
./bin/kafka-avro-console-producer
--broker-list b-3.**:9092,b-**:9092,b-**:9092 --topic AVRO-AUDIT_EVENT
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{"type":"struct","fields":[{"type":"string","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"VERSION"},{"type":"string","optional":true,"field":"ACTION_TYPE"},{"type":"string","optional":true,"field":"EVENT_TYPE"},{"type":"string","optional":true,"field":"CLIENT_ID"},{"type":"string","optional":true,"field":"DETAILS"},{"type":"string","optional":true,"field":"OBJECT_TYPE"},{"type":"string","optional":true,"field":"UTC_DATE_TIME"},{"type":"string","optional":true,"field":"POINT_IN_TIME_PRECISION"},{"type":"string","optional":true,"field":"TIME_ZONE"},{"type":"string","optional":true,"field":"TIMELINE_PRECISION"},{"type":"string","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"OBJECT_DISPLAY_NAME"},{"type":"string","optional":true,"field":"OBJECT_ID"},{"type":"string","optional":true,"field":"USER_DISPLAY_NAME"},{"type":"string","optional":true,"field":"USER_ID"},{"type":"string","optional":true,"field":"PARENT_EVENT_ID"},{"type":"string","optional":true,"field":"NOTES"},{"type":"string","optional":true,"field":"SUMMARY"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_UTC_DT"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_DATE_PITP"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_DATE_TZ"},{"type":"string","optional":true,"field":"AUDIT_EVENT_TO_DATE_TP"}],"optional":false,"name":"test"}"}' http://localhost:8081/subjects/view/versions
接下来我要做什么
但是当我尝试查看我的架构时,我只得到下面
curl --silent -X GET http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/latest
这是结果
{"subject":"AVRO-AUDIT-EVENT","version":1,"id":161,"schema":"{"type":"string","optional":false}"}
为什么我看不到完整注册的架构
另外,当我尝试删除架构时
我得到下面的错误
{"error_code":405,"message":"HTTP 405 Method Not Allowed"
我不确定我的架构是否正确注册。
请帮助我。 提前致谢
我的架构是否有效
您可以使用注册表的 REST API 尝试提交它并查看...
我应该在哪里保存这个架构文件位置,我不确定
目前尚不清楚您是如何发送消息的...
如果你真的写了Kafka生产者代码,你可以把它存储在你的代码中(作为一个字符串(或一个资源文件。如果使用 Java,则可以改用 SchemaBuilder 类来创建 Schema 对象
您需要重写生产者以使用 Avro 架构和序列化程序(如果尚未编写(
如果我们创建AVRO模式,它也适用于Json。
Avro是一种二进制格式,但有一个JSONDecoder。
我们的 AVRO 架构属性文件的 URL 应该是什么?
一旦你弄清楚如何启动它,它必须是你的架构注册表的IP。(带schema-registry-start
(
我必须更改 avro-消费者属性文件中的任何内容吗?
您需要使用阿夫罗反序列化器
这是注册架构的正确方法
.>/bin/kafka-avro-console-producer
差一点。这就是生成具有架构的消息的方式(并且您需要使用正确的架构(。您还必须提供--property schema.registry.url
使用注册表的 REST API 注册和验证架构