如果拓扑中出现异常,则跳过记录



我们正在编写一个Kafka Streams拓扑,它聚合数据并实时显示数据。我们希望使显示尽可能健壮-理想情况下,记录记录并继续任何异常。

根据文件,我们和进行了一些测试

  • 处理Kafka流中的异常
  • 使用Kafka';s流API
  • https://groups.google.com/g/confluent-platform/c/p75CleJ9yU0

Kafka Streams非常支持处理Producer中或反序列化过程中发生的异常。所提供的LogAndContinueExceptionHandler正好给出了我们想要的行为。然而,我们的主要问题是处理过程中发生的异常(例如.mapValues().leftJoin()

我们的想法基本上是验证的先决条件

  1. 在反序列化期间,如果未满足,则抛出反序列化异常(并记录并继续(
  2. 作为处理功能中的检查,如果无法执行计算,则返回默认值(/ by zero error等(

但是,如果数据中出现不可预见的情况,则仍可能出现异常,拓扑将关闭。

Kafka Streams提供了一个UncaughtExceptionHandler,但它是在线程已经死后调用的,因此它不能用于防止拓扑关闭。

有没有办法编写一个跳过记录的UncaughtException处理程序?或者,我们可以在处理函数内的try-catch块中跳过当前记录的机制?

我认为最好的解决方案是以从不抛出任何异常的方式编写处理操作(例如:Mapper、Filter等(。为此,您可以使用一个包装器对象,该对象可以是Success或in Error(例如:scala中的Either类型(。之后,可以使用branch()方法获得两个流:一个用于成功记录,另一个用于错误。

下面的代码显示了基本思想:

public static void main(String[] args) {
var builder = new StreamsBuilder();
KStream<Object, Result<Object>> stream = builder.stream("my-topic")
.map((k, v) -> {
try {
// unsafe operation, i.e that may throw an exception
return KeyValue.pair(k, new Success<>(v));
} catch (Exception e) {
return KeyValue.pair(k, new Error<>(e));
}
});
KStream<Object, Result<Object>>[] branch = stream.branch((k, v) -> !v.hasError(), (k, v) -> v.hasError());
// Handle the success steam
KStream<Object, Result<Object>> successStream = branch[0];
// Handle the error steam, e.g:  log errors, write errors to a Dead Letter Queue
KStream<Object, Result<Object>> errorStream = branch[1];

}
public interface Result<T> {
T get() throws Exception;
Exception exception();
boolean hasError();
}
public static class Success<T> implements Result<T> {
private final T value;
public Success(T value) {
this.value = value;
}
@Override
public T get() throws Exception {
return value;
}
@Override
public Exception exception() {
return null;
}
@Override
public boolean hasError() {
return false;
}
}
public static class Error<T> implements Result<T> {
private final Exception error;
public Error(Exception error) {  this.error = error; }
@Override
public T get() throws Exception{
throw error;
}
@Override
public Exception exception() {
return error;
}
@Override
public boolean hasError() {
return true;
}
}

此外,对于您提到的反序列化异常,Azkarra Streams项目提供了一些方便的java类,可以帮助您(例如,SafeSerdes、DeadLetterTopicExceptionHandler(:GitHub

相关内容

  • 没有找到相关文章

最新更新