为什么我的 kafkajs 客户端 (Node.js/express.js) 在获取主题元数据时抛出"TypeError: topics.forEach 不是一个函数"?



我正在尝试使用 kafkajs 管理客户端获取我的 kafka 代理主题的元数据。我已经用 Node.js + express.js 编写了我的服务器。

这是我index.js文件,它是 npm 的入口点。

'use strict';
const express = require('express');
const bodyParser = require('body-parser');
const Admin = require('./Admin/create-admin-client');
const TopicMetaData = require('./Admin/topic-metadata-fetch');
const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());
const adminConfig = new Admin({
clientId: 'admin-client-4981',
brokers: ['localhost:9092']
})
const admin = adminConfig.getAdmin();


.
..
...
// Handles the admin connection, disconnection, and other routes here
...
..
.


//This is where the error is
app.post('/api/v1/dev/admin/topicmetadata', (req, res) => {
const topic = req.body;
const topicmetadata = new TopicMetaData(admin);
topicmetadata.setTopicConfig(topic);
topicmetadata.commit(req, res);
});
app.listen(4040);

这是检索管理对象的create-admin-client.js文件。

'use strict';
const { Kafka } = require('kafkajs');
class Admin {
constructor(kafkaConfig) {
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
}
getAdmin() {
return this.kafka.admin();
};
}
module.exports = Admin;

这是topics-metadata-fetch.js文件,用于获取主题的元数据。

'use strict';
class TopicMetaData {
constructor(admin) {
this.admin = admin;
}
setTopicConfig(topicConfig) {
this.topic = topicConfig;
}
commit(req, res) {
this.admin.fetchTopicMetadata({
topics: this.topic
})
.then((topics) => {
console.log("Topic MetaData Fetched Successfully!");
res.status(200).send({
topics
});
})
.catch((err) => {
console.error(err);
res.status(500).send(err);
})
}
}
module.exports = TopicMetaData;

每当我发送 POST 请求以获取主题的元数据(例如"服务类型"时,我已成功创建主题(,req.body

{
"topic": "SERVICE-TYPES",
"partitions": [{
"partitionErrorCode": 0,
"partitionId": 0,
"leader": 0,
"replicas": [0],
"isr": [0]
}]
}

它返回TypeError: topics.forEach is not a function错误。 我哪里做错了?

我发现我的http请求正文应该是以下结构:

{
"topics": [{
"topic": "SERVICE_TYPES",
"partitions": [{
"partitionErrorCode": 0,
"partitionId": 0,
"leader": 0,
"replicas": [0],
"isr": [0]
}]
}]
}

我的路由处理程序应该是这样的:

app.get('/api/v1/dev/admin/topicmetadata', (req, res) => {
//topic should be array of the topics
const topic = req.body.topics;
const topicmetadata = new TopicMetaData(admin);
topicmetadata.setTopicConfig(topic);
topicmetadata.commit(req, res);
});

所以我传递了错误的请求正文结构。

相关内容

  • 没有找到相关文章

最新更新