我有一个非常简单的代码,它应该只是在来自官方GitHub 的本地主机中使用Kafka将一个简单的消息从生产者传输到消费者
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
当我使用node index.js
(也就是这个文件(执行这个代码时,我会得到以下两个错误中的许多:
{"level":"ERROR","timestamp":"2021-03-29T12:01:00.633Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka1","broker":"kafka1:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka1n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:60:26)"}
{"level":"ERROR","timestamp":"2021-03-29T12:01:00.635Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: getaddrinfo ENOTFOUND kafka1","retryCount":0,"retryTime":323}
所以,我想,在本地端口9092上定义代理的第一行似乎不起作用?但我不知道该怎么纠正。你能帮我吗?
感谢OneCricketer,我发现我没有正确启动本地kafka服务器,所以第一行无法工作。
非常感谢他!