我想知道是否有办法从流拓扑中删除记录/消息?
我有如下设置:
builder.stream("my-source-topic")
.map(CustomMapper)
.mapValues(CustomValueMapper)
.filterNot(CustomFilter)
.transformValues(CustomValueTransformer)
.toStream()
每个CustomMapper/CustomFilter等都会覆盖它们各自的应用/转换方法,它们可能如下所示,注意到错误可能是不可恢复的,这是一个不错的解决方案,这些消息将手动处理,并编写相应的日志。假设不可恢复的错误发生在第一个映射期间,我现在如何阻止后面的阶段处理该记录,我想停止处理该记录并转移到下一个记录。
@Override
public V transform(K readOnlyKey, V value) {
try {
// do some logic
} catch(Exception e){
// process error - this might be unrecoverable.
dropRecord(); // this is what i would be looking for if possible
}
}
我可以终止线程,并让customUncaughtException处理程序重新调度线程,该线程不会提交偏移量,因此尝试再次处理错误记录。
为传递的对象创建包装器需要在每个处理步骤中添加一个检查,以查看记录是否仍然有效。
在每个处理步骤之前添加.branch((也需要大量的返工。
只需返回null
,就可以将消息放入Transformer
中。请参阅Transformer#transform的Javadoc。所以你的例子是:
@Override
public V transform(K readOnlyKey, V value) {
try {
// do some logic
} catch(Exception e){
// process error - this might be unrecoverable.
return null;
}
}
请注意,您当前只能在Transformer
中执行此操作,但不能在ValueTransformer
中执行。