我正在使用kafka 1.1和kafka rets代理4.1.2。我一直使用字符串键从Kafka流中的内部键入记录。我想使用REST代理来插入要连接的记录,但是钥匙在其周围放置了逃脱的报价标记。
我正在使用Content-Type: application/vnd.kafka.json.v2+json
向/topics/{someTopic}
发送POST
请求,这会导致问题。
使用 Content-Type: application/vnd.kafka.avro.v2+json
和key_schema类型:字符串,钥匙周围没有额外的引号,但我宁愿发送JSON值。
这是我发送到/topics
端点的内容。
{
"records": [
{
"key": "abc",
"value": {"animal": "dog"}
}
]
}
当我在kafka流中流式传输数据时,键将以"abc"
出现,显然没有与String Keys abc
一起使用记录。
是否有一种方法可以指定具有JSON值的密钥模式,以便我的钥匙不会在它们周围得到逃脱的报价标记?
使用Content-Type: application/vnd.kafka.json.v2+json
标头时,JSON键将绕所有字符串的引号逃脱,以便在Streams应用程序中正确挑选。使用简单键时,数字键未修改时,字符串似乎确实被引用了。
Content-Type: application/vnd.kafka.binary.v2+json
将按照您的赋予它们成对产生您的钥匙值对,而无需向字符串键添加Esc的引号。您只需要base64编码键和值。
您的示例身体变为:
{
"records": [{
"key": "YWJj",
"value": "eyJhbmltYWwiOiJkb2cifQ=="
}]
}
根据kafka docs,消息的格式对JSON是正确的。我认为,您应该尝试以下消息标头。
Content-Type: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json
我建议您通过以下Kafka文档。
https://docs.confluent.io/current/kafka-rest/api.html