我有一个架构,我们有两个独立的应用程序。原始来源是SQL数据库。App1倾听到CDC表以跟踪该数据库中表的更改,将这些更改归一化和序列化。它获取这些序列化消息,并将其发送到Kafka主题。App2聆听该主题,将消息调整为不同的格式,并通过http将这些改编的消息发送到其各自的目的地。
所以我们的流架结构看起来像:
sql(cdc event( -> app1(事件归一化( -> kafka-> app2(将事件调整为端点( ->各种端点
我们希望在失败的情况下添加错误处理,无法容忍重复的事件,丢失事件或订单更改。鉴于上面的体系结构,我们真正关心的是,完全适用于从App1到App2(我们的独立生产者和消费者(的消息
我正在阅读的所有内容以及我发现的每个示例交易API都指向"流"。看起来KAFKA流媒体API是用于从Kafka主题中获取输入,进行处理并将其输出到另一个Kafka主题的单个应用程序,该应用程序似乎不适用于我们对Kafka的使用。这是Confluent文档的摘录:
现在,流处理不过是一个已阅读过程的操作 关于卡夫卡主题;消费者阅读Kafka主题的消息,有些 处理逻辑会转换这些消息或修改状态 由处理器维护,生产者写了由此产生的 给另一个Kafka主题的消息。恰好一次流处理 仅仅可以执行重读程序操作的能力 一度。在这种情况下,"获得正确的答案"意味着不要丢失 任何输入消息或产生任何重复输出。这是 行为用户期望的一次流处理器。
我正在努力地围绕如何与Kafka主题完全使用,或者是否为非"流"用例构建了Kafka的确切联系。我们将必须建立自己的重复数据删除和容忍度吗?
如果您使用的是使用Kafka的流API(或者使用Kafka完全支持一开始处理的另一个工具(,则跨应用程序涵盖了Kafka的恰到好处的语义(EOS(:
topic A --> App 1 --> topic B --> App 2 --> topic C
在您的用例中,一个问题是初始CDC步骤是否也支持EOS。换句话说,您必须提出一个问题:涉及哪些步骤,并且所有步骤均由EOS涵盖?
在下面的示例中,当(仅当(初始CDC步骤也支持EOS时,EOS受支持。
。SQL --CDC--> topic A --> App 1 --> topic B --> App 2 --> topic C
如果您将Kafka Connect用于CDC步骤,则必须检查您是否支持EOS是或NO。
我正在阅读的所有内容,我发现的每个示例都指向"流"。
Kafka生产商/消费者客户的交易API为EOS处理提供了原始功能。位于生产者/消费者客户端的Kafka流,使用此功能以某种方式实现EOS,即具有几行代码的开发人员可以轻松地使用它(例如,在应用程序需求时自动照顾国家管理进行诸如聚合或加入之类的状态操作(。也许生产者/消费者< -> kafka流是您阅读文档后的困惑吗?
当然,您还可以在开发应用程序时使用基础的Kafka生产商和消费者客户(与交易API("构建自己的",但这是更多的工作。
我正在努力地围绕如何与Kafka主题完全使用,或者是否为非"流"用例构建了Kafka的确切联系。我们必须建立自己的重复数据删除和容忍度吗?
不确定"非流式"用例的含义。如果您的意思是"如果我们不想使用Kafka流或KSQL(或者可以从Kafka读取到处理数据的其他现有工具(,那么我们需要在应用程序中实现EOS是什么?",那么答案是"是的,在这种情况下,您必须直接使用Kafka生产者/客户,并确保您对他们做任何事情都可以正确实现EOS处理。"(而且由于后者很困难,因此将此EOS功能添加到Kafka流中。(
我希望有帮助。