处理JSON解析错误,而不会崩溃Kafka流处理器应用程序



我有一个kafka流媒体应用程序,该应用程序映射/转换JSON消息并将输出流传输到主题。

KStream<String, String> logMessageStream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
logMessageStream.map((k, v) -> { //Map record 
                try { // Map record to (requestId, message)
                    // readValue throws IOException, JsonParseException, JsonMappingException
                    LogMessage logMessage = objectMapper.readValue(v, LogMessage.class);
                    return new KeyValue<>(logMessage.requestId(), logMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null; // <== RETURNS null due to caught exception
}).toStream().to(outoutTopic)

现在,如果输入记录JSON包含无效的语法,则我将获得解析错误,流应用程序崩溃了:

java.lang.NullPointerException
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    ....

我想在映射时消耗此错误,然后继续处理其他消息。我可以设置任何处理程序以使消费者例外。寻找建议。

谢谢..

您也可以利用 StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG属性,如https://docs.confluent.io/current/current/streams/faq.html#handling-handling-corrupted-corrupted-recorts-and-records-and-records-and-records-and-records-and-records-and-records-and-records-and-records-and-records-and-records-and-records-and------挑战 - 纠纷奖励。

Properties streamsSettings = new Properties();
streamsSettings.put(
  StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  LogAndContinueExceptionHandler.class.getName()
);

而不是使用map(),您可以使用允许您返回零元素的flatMap()。正如Javadocs中指出的那样,不允许从map()返回null

提供的{@link keyValuemapper}必须返回{@link keyValue} type,并且不得返回{@code null}。

请注意,flatMap()也不允许返回null。但是它接受您可以迭代的任何东西(即Iterable(。例如,您可以在成功方面返回Collections.singleton(),而Collection.emptySet()在失败中。

只需查看setuncaughtexceptionhandler方法:

KafkaStreams streams = new KafkaStreams(topology, props);    
streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
        // your logic here
    });

相关内容

  • 没有找到相关文章

最新更新