我有一个Kafka流,设置如下:
- 主题
- A 上出现一条消息,状态为"已创建"
- 调用远程 REST 终结点以通知已收到消息
- 然后转换消息(状态设置为"通知"(并发送回到主题 A
我知道如何为快乐路径设置它(REST 端点返回 HTTP 2xx(。 我的问题是:如果我的终端节点返回 400,我如何配置我的 kstream 来处理它而不发出消息?
这是我的 KSteam 配置:
final KStream<String, Message> stream = streamsBuilder.stream(topic,
Consumed.with(Serdes.String(), messageJsonSerde));
stream
.filter((key, value) -> MessageType.CASE_MATCHED.equals(
value.getEventType()))
.mapValues(notifierService::handleNotification) // calls REST endpoint
.to(topic, Produced.with(Serdes.String(), messageJsonSerde));
return stream;
我想最简单的方法是,在mapValue
之后添加另一个filter
,以丢弃未成功处理的消息。
您可以更改第一个mapValue
以将此信息添加到记录中,以便filter
可以访问它。此外,您可以在第二个filter
之后执行第二次mapValue
,以删除此附加success
标志,以将未修改的消息写回 Kafka。
当然,这不会重新处理任何失败的消息,而只会从输出中过滤掉失败的消息。