是什么导致Node Kafka Streams出现这种间歇性问题



我有一个卡夫卡生产者和消费者。

生产者这样做:

const returnMessage = {
prop1: 'some string',
prop2: 'another string',
prop3: nestedObject
};
console.log(JSON.stringify(returnMessage))
await stream.writeToStream(JSON.stringify(returnMessage));

消费者这样做:

incomingStream.forEach(
message => {
console.log(message.value)
let messageObject = message.value;
...other stuff...
}
);

现在,在生产者端,返回消息总是作为一个适当的字符串记录,一切都很好。但在消费者端,一开始,message.value是一个合适的字符串,可以从中解析JSON,但在随后的请求中,它会显示为"[object object]"。如果

我觉得我错过了一些关键的东西。。。如果你有任何见解,请帮忙。

好的,我明白了。incomingStream.forEach发生在路由内部,而流正在控制器级别实例化。我通过将forEach移动到控制器级别来修复它,并让它在每条消息上发出解析的消息,然后订阅路由内的事件。

最新更新