模型类"is not in the trusted packages"



我有一个主题,其中一个服务写入以下消息:

{"version":"1.0.0","action":"CREATE_BACKUP","actionId":"58048","backupManagerId":"config_data","status":"STARTED"} 

从制作人的日志中我看到:

{"version":"1.1.0","timestamp":"2021-10-28T18:30:16.039+00:00","severity":"info","service_id":"bro",
"message":"Sent notification: Notification [version=1.0.0, action=RESTORE, actionId=31247, backupManagerId=test_data, status=COMPLETED]",
"extra_data":{"location":{"class":"com.test.mgmt.backupandrestore.notification.KafkaNotifier"}}}

我有一个消费者服务,它在产生消息时接收到这个错误:

{"version": "1.0.0", "timestamp": "2021-10-28T20:19:06.265+0000", "severity": "error", "service_id": "eric-cnels-license-front-end", "message": 
"18@LogAccessor.java::error:149@Consumer exception@ java.lang.IllegalStateException: 
This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' 
in the value and/or key deserializer//  at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
~[spring-kafka-2.7.6.jar:2.7.6]//  at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) 
~[spring-kafka-2.7.6.jar:2.7.6]//  
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) 
[spring-kafka-2.7.6.jar:2.7.6]//   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210)
[spring-kafka-2.7.6.jar:2.7.6]//   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_302]//  
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_302]//    
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]//
Caused by: org.apache.kafka.common.errors.SerializationException:
Error deserializing key/value for partition bro-notification-0 at offset 6. If needed, please seek past the record to continue consumption.
//Caused by: java.lang.IllegalArgumentException: 
The class 'com.test.mgmt.backupandrestore.notification.Notification' is not in the trusted packages: 
[java.util, java.lang, com.test.server.configuration.messagebus, com.test.server.configuration.messagebus.*]. 
If you believe this class is safe to deserialize, please provide its name. 
If the serialization is only done by a trusted source, you can also enable trust all (*).//    
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129) 
~[spring-kafka-2.7.6.jar:2.7.6]//  at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) 
~[spring-kafka-2.7.6.jar:2.7.6]//  at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521) 
~[spring-kafka-2.7.6.jar:2.7.6]//  at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1387) ~[kafka-clients-2.7.1.jar:?]//    
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-2.7.1.jar:?]//    
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618) ~[kafka-clients-2.7.1.jar:?]//   
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454) ~[kafka-clients-2.7.1.jar:?]//    
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:687) ~[kafka-clients-2.7.1.jar:?]//   
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:638) ~[kafka-clients-2.7.1.jar:?]// 
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[kafka-clients-2.7.1.jar:?]//  
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[kafka-clients-2.7.1.jar:?]//    
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar:?]//    
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1412) 
~[spring-kafka-2.7.6.jar:2.7.6]//  
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) 
~[spring-kafka-2.7.6.jar:2.7.6]//  
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) 
~[spring-kafka-2.7.6.jar:2.7.6]//  ... 3 more//@"}

消费者配置为:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);

@Bean
public ConsumerFactory<String, BroMessageEvent> broConsumerFactory(){
String bootstrapAddress = SystemConfiguration.MESSAGE_BUS_BOOTSTRAP_SERVERS;
String groupId = SystemConfiguration.MESSAGE_BUS_GROUP; // this must be random
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 300000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(BroMessageEvent.class));
}
@Bean(name = "broListnerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> broListnerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, BroMessageEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(broConsumerFactory());
return factory;
}
@Bean
public KafkaEventListener kafkaEventListener() {
//This is handled by Spring and will allocate it along with the springcontext in the main
return new KafkaEventListener();
}
}

和BroMessageEvent是:

package com.ericsson.licensing.nels.server.configuration.messagebus;
public class BroMessageEvent{
private String version;
private String action;
private String actionId;
private String backupManagerId;
private String status;
public BroMessageEvent(){
this.version = null;
this.action = null;
this.actionId = null;
this.backupManagerId = null;
this.status = null;
}
public BroMessageEvent(String versionIn, String actionIn, String actionIdIn, String backupManagerIdIn, String statusIn){
this.version = versionIn;
this.action = actionIn;
this.actionId = actionIdIn;
this.backupManagerId = backupManagerIdIn;
this.status = statusIn;
}
public String getVersion(){return this.version;}
public void setVersion(String verisionIn){this.version=verisionIn;}
public String getAction(){return this.action;}
public void setAction(String actionIn){this.action=actionIn;}
public String getActionId(){return this.actionId;}
public void setActionId(String actionIdIn){this.actionId=actionIdIn;}
public String getBackupManagerId(){return this.backupManagerId;}
public void setBackupManagerId(String backupManagerIdIn){this.backupManagerId=backupManagerIdIn;}
public String getStatus(){return this.status;}
public void setStatus(String statusIn){this.status=statusIn;}
public String toString(){
return "Version: " + version + ", Action: " + action +", BackupManagerId: " + backupManagerId + ", Status: " +status;
}
}

是否有可能某些配置是错误的,也许是JsonDeserializer类映射?有没有可能&;extra_data&;会给我带来麻烦吗?链接到kafka并列出有关主题的消息,我只看到像上面这样格式化的消息。
是什么导致了这个错误?建议吗?

您可以通过将trusted_packages *的值替换为。另外,使用StringJsonMessageConverter使用StringDeserializerByteArrayDeserializer而不是JsonDeserializer.

props.put(JsonDeserializer.TRUSTED_PACKAGES,"<path-to-your-package>");

见此answer1, Answer2。

相关内容

最新更新