我正在使用
弹簧套:2.3.5.释放春云:霍克斯顿SR8
我正在尝试春季云流kafka流应用程序。一切都运行良好,直到出现反序列化异常。应用程序每次都会关闭。
我想跳过糟糕的记录,继续卡夫卡主题。但我无法做到这一点。配置:
spring:
application:
name: statsprocessor.${ENV}.${INSTANCE_ID}
cloud:
stream:
instance-index: ${INSTANCE_INDEX}
instance-count: ${INSTANCE_COUNT}
bindings:
statsInput:
destination: ${STORE_INPUT_TOPIC}
group: statsprocessor.${ENV}
consumer:
concurrency: ${CONCURRENCY}
partitioned: true
useNativeDecoding: true
kafka:
streams:
bindings:
statsInput:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: per.shades.framework.kafka.serdes.CotsEventSerde
startOffset: earliest
applicationId: statsprocessor.${ENV}
autoCommitOnError: false
dlqName: ${STORE_INPUT_DLQ}
useNativeDecoding: true
configuration:
client.id: statsprocessor.${ENV}.${INSTANCE_ID}
binder:
auto-add-partitions: true
auto-create-topics: true
deserializationExceptionHandler: logAndContinue
brokers:
- ${KAFKA_URI}
configuration:
num.stream.threads: ${CONCURRENCY}
buffered.records.per.partition: 500
cache.max.bytes.buffering: 10485760
commit.interval.ms: 500
state.dir: ${KAFKA_STATE_DIR}
replication.factor: ${DEFAULT_KAFKA_TOPIC_REPLICATION_FACTOR}
reconnect.backoff.ms: 15000
retry.backoff.ms: 10000
producer.linger.ms: 100
producer.acks: all
producer.retries: 3
producer.batch.size: 16384
consumer.max.poll.records: 100
consumer.session.timeout.ms: 60000
我得到的错误是
Exception in thread "statsprocessor.local.1-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Exception caught in process. taskId=0_4, processor=KSTREAM-SOURCE-0000000000, topic=cots-event-store, partition=4, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException:
ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: per.shades.model.events.CotsEvent.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
....
....
Caused by: java.lang.ClassCastException: class per.shades.model.events.CotsEvent cannot be cast to class per.shades.model.stats.StatsMetadata (per.shades.model.events.CotsEvent and per.shades.model.stats.StatsMetadata are in unnamed module of loader 'app')
现在我使用这个设置deserializationExceptionHandler: logAndContinue
。但它仍然没有效果。根据文档,它应该简单地记录错误并继续处理。即它应该跳过坏记录。但这并没有发生。可以看到此错误。
All stream threads have died. The instance will be in error state and should be closed.
我也使用
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer()
{
return streamsBuilderFactoryBean ->
{
streamsBuilderFactoryBean.getStreamsConfiguration() .put(org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
ContinueOnErrorHandler.class);
};
}
处理程序类是
public class ContinueOnErrorHandler implements DeserializationExceptionHandler
{
@Override
public DeserializationHandlerResponse handle(ProcessorContext processorContext, ConsumerRecord<byte[], byte[]> consumerRecord, Exception e)
{
System.out.println(">>>>>>> We are here");
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> map)
{
}
}
但这也不起作用。它没有接到电话。
我不想删除我的卡夫卡主题来摆脱糟糕的记录。真的很难解决简单的反序列化错误。请帮忙!!
编辑:绑定代码:
@Configuration
public interface StatsStreamBindings
{
String statsInput = "statsInput";
@Input(statsInput)
KStream<String, StatsMetadata> statsInput();
}
处理器签名
public void aggregateStats(KStream<String, StatsMetadata> inputStream)
由java.lang.ClassCastException引起:类per.shades.model.events.CotsEvent无法强制转换为类per.shhades.model.stats.StatsMetadata
您得到的不是DeserializationException
,而是ClassCastException invoking Processor
;这意味着反序列化程序成功地反序列化为CotsEvent
,但您的处理器需要StatsMetadata
。
DeserializationExceptionHandler
仅处理DeserializationException
s。