我对整个微服务体系结构还很陌生,所以我一直在使用Kafka来检索数据并向微服务发送数据。我很清楚如何通过普通消费者和生产者发送和检索数据(尽管我不是这方面的专家(,但我最近了解了Kafka Streams,并想用它来简化我正在消费的数据。我可以从另一个微服务收集数据,但我注意到,如果我终止进程并再次运行它,我会拿回数据,再加上它下面的另一个相同数据副本。如果我终止该进程并再次执行它,我将在最后一个副本下面有另一段重复数据!即使我停止运行提供数据的其他微服务,我也可以收集数据,所以我假设数据会保存在某个地方。如果我终止它一次并再次运行,它会是什么样子。。。
TOPIC: requestAllUserData
[kafka-producer -> requestAllUserData]: broker update success
[
{
id: 1,
first_name: 'John',
last_name: 'Doe',
city: 'Northridge',
age: 25,
gender: 'Male',
profession: 'Teacher',
email: 'johntho213@gmail.com',
username: 'JohnTho213',
created_at: '06-05-2019',
deleted_at: '09-29-2020'
},
{
id: 2,
first_name: 'Mike',
last_name: 'Brown',
city: 'Topanga',
age: 19,
gender: 'Male',
profession: 'Senator',
email: 'mikebrown@gmail.com',
username: 'MBrownYe',
created_at: '07-04-18',
deleted_at: null
}
]
[
{
id: 1,
first_name: 'John',
last_name: 'Doe',
city: 'Northridge',
age: 25,
gender: 'Male',
profession: 'Teacher',
email: 'johntho213@gmail.com',
username: 'JohnTho213',
created_at: '06-05-2019',
deleted_at: '09-29-2020'
},
{
id: 2,
first_name: 'Mike',
last_name: 'Brown',
city: 'Topanga',
age: 19,
gender: 'Male',
profession: 'Senator',
email: 'mikebrown@gmail.com',
username: 'MBrownYe',
created_at: '07-04-18',
deleted_at: null
}
]
正如你所看到的,我收到了两次有效载荷,我只想看一次。有人知道这种意外行为的可能原因吗?我遵循了此处的文档->https://nodefluent.github.io/kafka-streams/docs/
我已经包含了我的代码,它与下面的文档中的代码差别很小
"use strict";
const { KafkaStreams } = require("kafka-streams");
const { nativeConfig: config } = require("./KSConfig.js");
const kafkaStreams = new KafkaStreams(config);
const stream = kafkaStreams.getKStream();
stream
.from("AllUserDataResponse")
.forEach(message => console.log(JSON.parse(message.value)));
function streamTest(){
stream.start().then(() => {
console.log("stream started, as kafka consumer is ready.");
}, error => {
console.log("streamed failed to start: " + error);
});
}
exports.streamTest = streamTest;
我在Server.js文件中运行了这个,尽管我不认为这些信息真的有帮助。此外,我一直在尝试收集数据并将其存储在列表或数组中,但一直没有成功,所以如果有人能帮我做这件事,我将不胜感激。哦,如果有帮助的话,这是我的KSConfig文件。
"use strict";
const batchOptions = {
batchSize: 5,
commitEveryNBatch: 1,
concurrency: 1,
commitSync: false,
noBatchCommits: false
};
const nativeConfig = {
noptions: {
"metadata.broker.list": "localhost:9092", //native client requires broker hosts to connect to
"group.id": "kafka-streams-test-native",
"client.id": "kafka-streams-test-name-native",
"event_cb": true,
"compression.codec": "snappy",
"api.version.request": true,
"socket.keepalive.enable": true,
"socket.blocking.max.ms": 100,
"enable.auto.commit": false,
"auto.commit.interval.ms": 100,
"heartbeat.interval.ms": 250,
"retry.backoff.ms": 250,
"fetch.min.bytes": 100,
"fetch.message.max.bytes": 2 * 1024 * 1024,
"queued.min.messages": 100,
"fetch.error.backoff.ms": 100,
"queued.max.messages.kbytes": 50,
"fetch.wait.max.ms": 1000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 10000
},
tconf: {
"auto.offset.reset": "earliest",
"request.required.acks": 1
},
batchOptions
};
module.exports = {
nativeConfig
};
如有任何进一步的问题,我将予以答复。如有任何帮助或建议,我们将不胜感激。谢谢
首先,您需要知道Kafka不能保证一次交付(我们确实有幂等生产者和交易,但为了简单起见,我会在本讨论中假装它们不存在(。可能有相当多的场景会导致重复。
在您的用例中,假设一个消息批处理(m1-m5(由使用者读取。它被处理(并作为日志的一部分打印(,但在它被提交给代理之前,该过程已经终止。在这种情况下,当进程再次启动时,它会重新读取相同的消息(m1-m5(,因为它从未提交给代理。
也就是说,复制品也可以来自生产者。假设kafka生产者发送了一条消息,代理收到了它,但由于某些网络故障,来自代理的ack丢失了。在这种情况下,生产者可以重新发送导致重复的消息。因此,总之,你应该在你的kafka管道中期待重复,并因此尝试使其幂等。