我想通过kafka Rest Proxy生成一个kafka主题。我已经在模式注册表中创建了一个JSON模式,我希望所有消息都根据注册的模式进行验证,如果它们与模式不匹配,就会被拒绝。
我的模式
{
"type": "object",
"properties": {
"foo": {
"type": "string",
},
"bar": {
"type": "number"
}
}
}
此架构已正确注册并分配了版本1。然后,我尝试为foo
和bar
生成具有错误数据类型的消息,但该消息被接受。
curl --location --request POST 'http://localhost:8082/topics/test'
--header 'Content-Type: application/vnd.kafka.jsonschema.v2+json'
--header 'Accept: application/vnd.kafka.v2+json'
--data-raw '{
"value_schema_id": 1,
"records": [
{
"value": {
"foo": 10,
"bar":"not a number"
}
}
]
}'
请注意,我正在为test
主题生成一个相关的模式,但无论如何,错误的消息都被接受了。我还尝试添加"value_schema_id": 1
以确保架构在有效负载中被引用,但错误消息仍然被接受。
但是,如果我将JSON模式作为value_schema
传递,它将按预期工作
{
"value_schema": "{"type": "object","properties": {"foo": {"type": "string"},"bar": {"type": "number"}}}",
"records": [
{
"value": {
"foo": "10",
"bar": "1"
}
}
]
}
响应
{
"error_code": 42203,
"message": "Conversion of JSON to Object failed: Failed to convert JSON using JSON Schema: #/bar: expected type: Number, found: String"
}
问题:是否可以在生成消息时引用现有的模式id,而不必每次都传递整个JSON模式?
是的,这是可能的。你必须启用它,但要么值,键或两者都按照以下:
confluent.value.schema.validation=true
confluent.key.schema.validation=true
此外,在JSON请求中,您可以指定模式的ID。以下面的例子来说明我的意思:
{
"key_schema_id": 1234,
"value_schema_id: 56789,
"records": [
{
"key": "key123",
"value": "my_simple_string_value_example"
}
]
}
其中:
key_schema_id
是架构注册表中密钥架构的ID。通过将适当的值设置为,它将保证只有具有符合该ID标识的架构的密钥的消息才会被接受。
类似:
value_schema_id
是架构注册表中值架构的ID。通过将适当的值设置为,它将保证只有值符合该ID标识的架构的消息才会被接受。
希望这能有所帮助。
干杯,
Eduardo Ponzoni