我创建了一个kafka流spring启动应用程序。输入:Json格式的
输出:AVRO格式
当我解析Json时,如果发现它被破坏或无效,我想跳过它。
但是当我尝试返回空AVRO类时,流API(自动发布到输出通道)返回
Exception in thread "AutonomousStreamListener-process-applicationId-31990c75-5965-42b6-906a-92e13855e40c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=FAULT_MANAGEMENT, partition=0, offset=4, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string of com.fujitsu.fnc.fums.faultMgmt.avro.model.Fault
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:183)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:177)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:110)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:59)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
如何处理这种情况,它涉及跳过流中的消息而不总是返回消息。
伪代码:@StreamListener("Processor-input-channel")
@SendTo("Processor-output-channel")
public KStream<String, AVROClass> process(KStream<String, String> input){
//parse input , map to fault and change received key to time stamp and send
KStream<String, AVROClass> kStream = input
.mapValues(v -> service.getAVROResponse(v))
.map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v));
kStream.foreach((k, v) -> log.info(String.format("Key: %s, Value: %s", k, v)));
return kStream;
}
[编辑]:
同时,应用程序似乎停在那里,流线程关闭。应用程序不接收新消息。
对AVRO流异常处理和伪代码的任何帮助都是赞赏的。
您可以使用过滤操作跳过消息。
@StreamListener("Processor-input-channel")
@SendTo("Processor-output-channel")
public KStream<String, AVROClass> process(KStream<String, String> input){
//parse input, map to fault and change received key to timestamp and send
KStream<String, AVROClass> kStream = input
.mapValues(v -> service.getAVROResponse(v))
.map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v))
.filter((k, v) -> v != null) <<<< ADD YOUR CONDITION >>>>
.peek((k, v) -> log.info(String.format("Key: %s, Value: %s", k, v)));
return kStream;
}
您也不需要forEach
来打印每条消息。这可以很容易地用peek
来完成。
同样,应用程序似乎停在那里,流线程关闭。应用程序不接收新消息。
将deserialization-exception-handler属性值设置为logandcontinue
,可以忽略反序列化异常。但是如果在服务端消息处理中有任何错误,您可以用try..catch
块封装它来处理它。