使用 Kafka schema-registry API 注册新的 avro 模式



我正在尝试使用kafka-schema-registeryAPI创建一个新schema。 以下是实现:

let value = JSON.stringify(avroSchema);
let type= {"schema" : value}; 
fetch(`${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}/versions`,
{ 
body : type,  
method : 'POST', 
headers :{ 'Content-Type': 'application/vnd.schemaregistry.v1+json,
application/vnd.schemaregistry+json, application/json',
'Accept' : 'application/vnd.schemaregistry.v1+json,
application/vnd.schemaregistry+json, application/json'            
}
})
.then(res=>res.json())
.then((result)=>{
console.log('result is ', result);   
resolve(result);    
})
.catch((err)=>{
console.log('err',err);
reject(err);
})

以下是avroSchema的外观:

const avroSchema = {
"type": "record",
"name": "test",
"fields" : [
{"name": "field", "type": "long"},
]
};

当我执行此代码时,我得到500 - Internal server error.

谁能帮助我了解我哪里出了问题?

对于未来的用户:

这是我能够解决这个问题的方法:

const payload = {
"schema": JSON.stringify(avroSchema)
}; 

最后,设置有效负载后,options发出POST请求,如下所示:

const options = {
method: 'POST',
url: `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topicName}-value/versions`,
headers: { 'Content-Type': 'application/vnd.schemaregistry.v1+json' },
body: payload,
json: true
}

然后提出请求:

request(options, function (error, response, body) {
if (error) {
reject(error);
}                   
resolve(body)
});

最新更新