如何处理 Kafka KStream 意外状态



我有一个Kafka流,设置如下:

    主题
  1. A 上出现一条消息,状态为"已创建"
  2. 调用远程 REST 终结点以通知已收到消息
  3. 然后转换消息(状态设置为"通知"(并发送回到主题 A

我知道如何为快乐路径设置它(REST 端点返回 HTTP 2xx(。 我的问题是:如果我的终端节点返回 400,我如何配置我的 kstream 来处理它而不发出消息?

这是我的 KSteam 配置:

    final KStream<String, Message> stream = streamsBuilder.stream(topic,
            Consumed.with(Serdes.String(), messageJsonSerde));
    stream
            .filter((key, value) -> MessageType.CASE_MATCHED.equals(
                    value.getEventType()))
            .mapValues(notifierService::handleNotification) // calls REST endpoint
            .to(topic, Produced.with(Serdes.String(), messageJsonSerde));
    return stream;

我想最简单的方法是,在mapValue之后添加另一个filter,以丢弃未成功处理的消息。

您可以更改第一个mapValue以将此信息添加到记录中,以便filter可以访问它。此外,您可以在第二个filter之后执行第二次mapValue,以删除此附加success标志,以将未修改的消息写回 Kafka。

当然,这不会重新处理任何失败的消息,而只会从输出中过滤掉失败的消息。

相关内容

  • 没有找到相关文章

最新更新