读取和处理来自 Kafka 的一批消息



我想定期从 kafka 主题读取一批消息,或者当读取的消息数量达到一定数量时,将它们作为批次发送到下游系统。目前,我的 kafka 拓扑由一个处理器终止,该处理器保存消息,然后使用标点符号方法以增量方式处理批处理。

但是,我不确定这是否完美,因为如果应用程序在调用标点符号方法之前崩溃,我认为某些消息会丢失(即消费者认为它已经完成了它们,但它们不会出现在下游系统中)。

batchQueue = new LinkedBlockingQueue<String>(batchSize);
KStream<String, String> inputStream = builder
.stream(Serdes.String(), Serdes.String(), "source-topic")
.process(new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new AbstractProcessor<String, Wrapper>() {
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(flushPeriod);
}
@Override
public void process(String key, String value) {
batchQueue.add(value);
if (batchQueue.remainingCapacity() == 0) {
processQueue();
}
}
@Override
public void punctuate(long timestamp) {
processQueue();
context().commit();
}
}
@Override
public void close() {}
};
}
});
  • 有没有办法使这种方法更健壮?也许是窗口,但我真的不明白这一点。
  • 我必须为此使用卡夫卡连接吗?由于它的错误处理能力,我倾向于远离这一点:https://groups.google.com/forum/#!topic/confluent-platform/OBuLbVHbuyI

有没有办法使这种方法更健壮?也许是窗口,但我真的不明白这一点。

我建议将数据转换部分(为此我使用 Kafka 的 Streams API)和数据摄取部分(我将使用 Kafka 的 Connect API)解耦,在该部分写入下游系统。

简而言之,为什么您的转换逻辑要与最终将转发到的下游系统之一的细节(此处:昂贵的插入!)相结合并需要担心? 理想情况下,转型的责任应该只是转型,而不应涉及外部下游系统的运营方面。 例如,如果您最终想要将转换后的数据转发到第二个下游系统(或第三个,...),那么耦合方法将意味着您必须更新/重新部署/...您的应用程序,即使其转换逻辑均未更改。

分离转换和引入的另一个好处是,转换逻辑将简单得多,因为它不必考虑由于下游系统运行缓慢、不可用等导致的故障。例如,它不需要实现/测试复杂的重试逻辑。

我必须为此使用卡夫卡连接吗?

不,你不需要使用Kafka Connect,但它可以说是完成这项任务的最佳工具。

由于[Kafka Connect]的错误处理能力,我倾向于使用它:https://groups.google.com/forum/#!topic/confluent-platform/OBuLbVHbuyI

在最新版本的Kafka Connect中,错误处理实际上相当不错。 此外,链接讨论中的问题可以通过提供更强大的转换器(想想:串行器/解串器)供Connect使用来轻松解决。

此外,如该链接中所述,当您在将数据写入 Kafka 之前验证数据的兼容性时,讨论的特定问题变得不那么重要。 您可以通过利用Confluent的模式注册表(https://github.com/confluentinc/schema-registry,文档或类似工具来实现这一点。 既然您提出了"如何使其更健壮"的问题,那么在部署到生产环境之前,考虑数据序列化和演进是我要考虑的另一个重要方面。

希望这有帮助!

相关内容

  • 没有找到相关文章

最新更新