删除一条消息Kafka Streams Topology



我想知道是否有办法从流拓扑中删除记录/消息?

我有如下设置:

builder.stream("my-source-topic")
.map(CustomMapper)
.mapValues(CustomValueMapper)
.filterNot(CustomFilter)
.transformValues(CustomValueTransformer)
.toStream()

每个CustomMapper/CustomFilter等都会覆盖它们各自的应用/转换方法,它们可能如下所示,注意到错误可能是不可恢复的,这是一个不错的解决方案,这些消息将手动处理,并编写相应的日志。假设不可恢复的错误发生在第一个映射期间,我现在如何阻止后面的阶段处理该记录,我想停止处理该记录并转移到下一个记录。

@Override
public V transform(K readOnlyKey, V value) {
try {
// do some logic
} catch(Exception e){
// process error - this might be unrecoverable.

dropRecord(); // this is what i would be looking for if possible
}
}

我可以终止线程,并让customUncaughtException处理程序重新调度线程,该线程不会提交偏移量,因此尝试再次处理错误记录。

为传递的对象创建包装器需要在每个处理步骤中添加一个检查,以查看记录是否仍然有效。

在每个处理步骤之前添加.branch((也需要大量的返工。

只需返回null,就可以将消息放入Transformer中。请参阅Transformer#transform的Javadoc。所以你的例子是:

@Override
public V transform(K readOnlyKey, V value) {
try {
// do some logic
} catch(Exception e){
// process error - this might be unrecoverable.

return null;
}
}

请注意,您当前只能在Transformer中执行此操作,但不能在ValueTransformer中执行。

相关内容

  • 没有找到相关文章

最新更新