我正在尝试使用 kafkajs 管理客户端获取我的 kafka 代理主题的元数据。我已经用 Node.js + express.js 编写了我的服务器。
这是我index.js
文件,它是 npm 的入口点。
'use strict';
const express = require('express');
const bodyParser = require('body-parser');
const Admin = require('./Admin/create-admin-client');
const TopicMetaData = require('./Admin/topic-metadata-fetch');
const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());
const adminConfig = new Admin({
clientId: 'admin-client-4981',
brokers: ['localhost:9092']
})
const admin = adminConfig.getAdmin();
.
..
...
// Handles the admin connection, disconnection, and other routes here
...
..
.
//This is where the error is
app.post('/api/v1/dev/admin/topicmetadata', (req, res) => {
const topic = req.body;
const topicmetadata = new TopicMetaData(admin);
topicmetadata.setTopicConfig(topic);
topicmetadata.commit(req, res);
});
app.listen(4040);
这是检索管理对象的create-admin-client.js
文件。
'use strict';
const { Kafka } = require('kafkajs');
class Admin {
constructor(kafkaConfig) {
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
}
getAdmin() {
return this.kafka.admin();
};
}
module.exports = Admin;
这是topics-metadata-fetch.js
文件,用于获取主题的元数据。
'use strict';
class TopicMetaData {
constructor(admin) {
this.admin = admin;
}
setTopicConfig(topicConfig) {
this.topic = topicConfig;
}
commit(req, res) {
this.admin.fetchTopicMetadata({
topics: this.topic
})
.then((topics) => {
console.log("Topic MetaData Fetched Successfully!");
res.status(200).send({
topics
});
})
.catch((err) => {
console.error(err);
res.status(500).send(err);
})
}
}
module.exports = TopicMetaData;
每当我发送 POST 请求以获取主题的元数据(例如"服务类型"时,我已成功创建主题(,req.body
为
{
"topic": "SERVICE-TYPES",
"partitions": [{
"partitionErrorCode": 0,
"partitionId": 0,
"leader": 0,
"replicas": [0],
"isr": [0]
}]
}
它返回TypeError: topics.forEach is not a function
错误。 我哪里做错了?
我发现我的http请求正文应该是以下结构:
{
"topics": [{
"topic": "SERVICE_TYPES",
"partitions": [{
"partitionErrorCode": 0,
"partitionId": 0,
"leader": 0,
"replicas": [0],
"isr": [0]
}]
}]
}
我的路由处理程序应该是这样的:
app.get('/api/v1/dev/admin/topicmetadata', (req, res) => {
//topic should be array of the topics
const topic = req.body.topics;
const topicmetadata = new TopicMetaData(admin);
topicmetadata.setTopicConfig(topic);
topicmetadata.commit(req, res);
});
所以我传递了错误的请求正文结构。