Kafkajs 中的 Producer.send() 日志"ERROR: [Connection] Connection timeout"但仍写入流



我对在AWS中使用Kafka和MSK相当陌生。我使用kafkajs从lambda写入到MSK集群。我的记录正在成功地写入我的Kafka集群,但我的客户端也将连接超时错误记录到CloudWatch中。我很好奇我是否可以在我的代码中做一些不同的事情来避免错误日志。

这是我的制作人代码:

const client = new Kafka({ 
clientId: "client-id", 
brokers: ["broker1:9092", "broker2:9092"],  // example brokers used here
});
const producer = client.producer({
idempotent: true
});
const record = {
topic: "topic1",
messages: [
{ value: JSON.stringify("message") }
]
};
await producer
.connect()
.then(async () => await producer.send(record))
.then(async () => await producer.disconnect())
.catch(err => throw new Error(JSON.stringify(err)));

下面是一个错误输出的例子:

{
"level": "ERROR",
"timestamp": "2022-12-05T20:44:06.637Z",
"logger": "kafkajs",
"message": "[Connection] Connection timeout",
"broker": "[some-broker]:9092",
"clientId": "[some-client-id]"
}

我不确定是否我只需要增加我的连接超时在客户端或如果我在初始化中丢失的东西。就像我说的,这个记录仍然会进入集群,但是我想清理日志,这样我就不会经常看到这个错误。有人遇到过这个问题并解决了吗?或者这是在使用MSK和kafkajs时看到的正常现象?

这不是一个令人兴奋的答案,但事实证明,其中一个代理没有正确配置我们的传输网关,以允许来自VPC的流量。这个故事的寓意是,始终检查代理端点的配置。

我在lambda中使用kafkajs通过Transit Gateway将数据从一个帐户发送到另一个帐户。代码本身按预期工作,但网关配置不正确,无法允许工作帐户的流量通过一个代理进入MSK集群。

最新更新