Kafka Rest Proxy JSON模式验证



我想通过kafka Rest Proxy生成一个kafka主题。我已经在模式注册表中创建了一个JSON模式,我希望所有消息都根据注册的模式进行验证,如果它们与模式不匹配,就会被拒绝。

我的模式

{
"type": "object",
"properties": {
"foo": {
"type": "string",
},
"bar": {
"type": "number" 
}
}
}

此架构已正确注册并分配了版本1。然后,我尝试为foobar生成具有错误数据类型的消息,但该消息被接受。

curl --location --request POST 'http://localhost:8082/topics/test' 
--header 'Content-Type: application/vnd.kafka.jsonschema.v2+json' 
--header 'Accept: application/vnd.kafka.v2+json' 
--data-raw '{
"value_schema_id": 1,
"records": [
{
"value": {
"foo": 10,
"bar":"not a number"
}
}
]
}'

请注意,我正在为test主题生成一个相关的模式,但无论如何,错误的消息都被接受了。我还尝试添加"value_schema_id": 1以确保架构在有效负载中被引用,但错误消息仍然被接受。

但是,如果我将JSON模式作为value_schema传递,它将按预期工作

{
"value_schema": "{"type": "object","properties": {"foo": {"type": "string"},"bar": {"type": "number"}}}",
"records": [
{
"value": {
"foo": "10",
"bar": "1"
}
}
]
}

响应

{
"error_code": 42203,
"message": "Conversion of JSON to Object failed: Failed to convert JSON using JSON Schema: #/bar: expected type: Number, found: String"
}

问题:是否可以在生成消息时引用现有的模式id,而不必每次都传递整个JSON模式?

是的,这是可能的。你必须启用它,但要么值,键或两者都按照以下:

confluent.value.schema.validation=true 
confluent.key.schema.validation=true

此外,在JSON请求中,您可以指定模式的ID。以下面的例子来说明我的意思:

{
"key_schema_id": 1234,
"value_schema_id: 56789,
"records": [
{
"key": "key123",
"value": "my_simple_string_value_example"
}
]
}

其中:

key_schema_id

是架构注册表中密钥架构的ID。通过将适当的值设置为,它将保证只有具有符合该ID标识的架构的密钥的消息才会被接受。

类似:

value_schema_id

是架构注册表中值架构的ID。通过将适当的值设置为,它将保证只有值符合该ID标识的架构的消息才会被接受。

希望这能有所帮助。

干杯,

Eduardo Ponzoni

最新更新