我正试图找到一种方法,在发生异常时记录偏移量。
以下是我正在努力实现的目标:
void createTopology(StreamsBuilder builder) {
builder.stream(topic, Consumed.with(Serdes.String(), new JsonSerde()))
.filter(...)
.mapValues(value -> {
Map<String, Object> output;
try {
output = decode(value.get("data"));
} catch (DecodingException e) {
LOGGER.error(e.getMessage());
// TODO: LOG OFFSET FOR FAILED DECODE HERE
return new ArrayList<>();
}
...
return output;
})
.filter((k, v) -> !(v instanceof List && ((List<?>) v).isEmpty()))
.to(sink_topic);
}
我发现:https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#streams-开发人员指南dsl有状态转换在我的理解中,我需要使用处理器API,但仍然没有找到解决问题的方法。
ValueTransfomer
也可以通过init
传递的ProcessorContext
访问偏移量,我相信这要容易得多。
以下是IUSR建议的解决方案:https://stackoverflow.com/a/73465691/14945779(谢谢(:
static class InjectOffsetTransformer implements ValueTransformer<JsonObject, JsonObject> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public JsonObject transform(JsonObject value) {
value.addProperty("offset", context.offset());
return value;
}
@Override
public void close() {
}
}
void createTopology(StreamsBuilder builder) {
builder.stream(topic, Consumed.with(Serdes.String(), new JsonSerde()))
.filter(...)
.transformValues(InjectOffsetTransformer::new)
.mapValues(value -> {
Map<String, Object> output;
try {
output = decode(value.get("data"));
} catch (DecodingException e) {
LOGGER.warn(String.format("Error reading from topic %s. Last read offset %s:", topic, lastReadOffset), e);
return new ArrayList<>();
}
lastReadOffset = value.get("offset").getAsLong();
return output;
})
.filter((k, v) -> !(v instanceof List && ((List<?>) v).isEmpty()))
.to(sink_topic);
}