我有一个主题,其中一个服务写入以下消息:
{"version":"1.0.0","action":"CREATE_BACKUP","actionId":"58048","backupManagerId":"config_data","status":"STARTED"}
从制作人的日志中我看到:
{"version":"1.1.0","timestamp":"2021-10-28T18:30:16.039+00:00","severity":"info","service_id":"bro",
"message":"Sent notification: Notification [version=1.0.0, action=RESTORE, actionId=31247, backupManagerId=test_data, status=COMPLETED]",
"extra_data":{"location":{"class":"com.test.mgmt.backupandrestore.notification.KafkaNotifier"}}}
我有一个消费者服务,它在产生消息时接收到这个错误:
{"version": "1.0.0", "timestamp": "2021-10-28T20:19:06.265+0000", "severity": "error", "service_id": "eric-cnels-license-front-end", "message":
"18@LogAccessor.java::error:149@Consumer exception@ java.lang.IllegalStateException:
This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer'
in the value and/or key deserializer// at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
~[spring-kafka-2.7.6.jar:2.7.6]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602)
[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210)
[spring-kafka-2.7.6.jar:2.7.6]// at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_302]//
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_302]//
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]//
Caused by: org.apache.kafka.common.errors.SerializationException:
Error deserializing key/value for partition bro-notification-0 at offset 6. If needed, please seek past the record to continue consumption.
//Caused by: java.lang.IllegalArgumentException:
The class 'com.test.mgmt.backupandrestore.notification.Notification' is not in the trusted packages:
[java.util, java.lang, com.test.server.configuration.messagebus, com.test.server.configuration.messagebus.*].
If you believe this class is safe to deserialize, please provide its name.
If the serialization is only done by a trusted source, you can also enable trust all (*).//
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521)
~[spring-kafka-2.7.6.jar:2.7.6]// at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1387) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:687) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:638) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[kafka-clients-2.7.1.jar:?]//
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar:?]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1412)
~[spring-kafka-2.7.6.jar:2.7.6]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249)
~[spring-kafka-2.7.6.jar:2.7.6]//
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
~[spring-kafka-2.7.6.jar:2.7.6]// ... 3 more//@"}
消费者配置为:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Bean
public ConsumerFactory<String, BroMessageEvent> broConsumerFactory(){
String bootstrapAddress = SystemConfiguration.MESSAGE_BUS_BOOTSTRAP_SERVERS;
String groupId = SystemConfiguration.MESSAGE_BUS_GROUP; // this must be random
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 300000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(BroMessageEvent.class));
}
@Bean(name = "broListnerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> broListnerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(broConsumerFactory());
return factory;
}
@Bean
public KafkaEventListener kafkaEventListener() {
//This is handled by Spring and will allocate it along with the springcontext in the main
return new KafkaEventListener();
}
}
和BroMessageEvent是:
package com.ericsson.licensing.nels.server.configuration.messagebus;
public class BroMessageEvent{
private String version;
private String action;
private String actionId;
private String backupManagerId;
private String status;
public BroMessageEvent(){
this.version = null;
this.action = null;
this.actionId = null;
this.backupManagerId = null;
this.status = null;
}
public BroMessageEvent(String versionIn, String actionIn, String actionIdIn, String backupManagerIdIn, String statusIn){
this.version = versionIn;
this.action = actionIn;
this.actionId = actionIdIn;
this.backupManagerId = backupManagerIdIn;
this.status = statusIn;
}
public String getVersion(){return this.version;}
public void setVersion(String verisionIn){this.version=verisionIn;}
public String getAction(){return this.action;}
public void setAction(String actionIn){this.action=actionIn;}
public String getActionId(){return this.actionId;}
public void setActionId(String actionIdIn){this.actionId=actionIdIn;}
public String getBackupManagerId(){return this.backupManagerId;}
public void setBackupManagerId(String backupManagerIdIn){this.backupManagerId=backupManagerIdIn;}
public String getStatus(){return this.status;}
public void setStatus(String statusIn){this.status=statusIn;}
public String toString(){
return "Version: " + version + ", Action: " + action +", BackupManagerId: " + backupManagerId + ", Status: " +status;
}
}
是否有可能某些配置是错误的,也许是JsonDeserializer类映射?有没有可能&;extra_data&;会给我带来麻烦吗?链接到kafka并列出有关主题的消息,我只看到像上面这样格式化的消息。
是什么导致了这个错误?建议吗?
您可以通过将trusted_packages *的值替换为。另外,使用StringJsonMessageConverter使用StringDeserializer或ByteArrayDeserializer而不是JsonDeserializer.
props.put(JsonDeserializer.TRUSTED_PACKAGES,"<path-to-your-package>");
见此answer1, Answer2。