Spring kstream 无法使处理器工作 - 类'[B'不在受信任的包中



完整代码:https://github.com/BenedictWHD/kstreams-example

因此,我有一个生产者(data-ingest(、处理器(external-message-processor(和消费者(internal-message-processor(一旦我开始工作,它稍后将成为处理器,所以很抱歉现在命名,但它是消费者(。

data-ingest向主题external_messages发送消息时,它根据我所能告诉的来工作。

external-message-processor尝试读取该主题,但失败,出现以下情况:

Caused by: java.lang.IllegalArgumentException: The class '[B' is not in the trusted packages: [java.util, java.lang, com.yetti.common.externalmessage, com.yetti.common.externalmessage.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

主题信息示例:

Headers: __TypeId__: [B, contentType: application/json, spring_json_header_types: {"contentType":"java.lang.String"}"eyJpZCI6IjE4ZGQ2ODc4LWYwNWQtNDJiOC1iYTdlLTU2MDhmMTkzOWU3YyIsImV4dGVybmFsTWVzc2FnZVNvdXJjZSI6IlNNUyIsIm1lc3NhZ2VUeXBlIjoiVFJBTlNBQ1RJT04iLCJudW1iZXJGcm9tIjoiMSIsIm51bWJlclRvIjoiMiIsImNjeSI6Ik5UVEwiLCJxdWFudGl0eSI6IjIuNSJ9"

正如你所看到的,TypeId出于某种原因是";[B〃.

我已经为所有3个应用程序指定了使用以下序列化程序和反序列化程序:

serializer: org.springframework.kafka.support.serializer.JsonSerializer
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

当应用程序在生产者和消费者配置中启动时,似乎只有data-ingest实际上使用了与其他应用程序相同的正确序列化程序

value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

从我在application.yml文件中看到的情况来看,一切都是应该的,所以我不知道为什么它没有使用我指定的序列化程序/反序列化程序,以及为什么处理器无法读取主题的消息?

感谢您的帮助,因为我已经为此挠头好几天了。

一开始的repo包含所有的配置文件、pom和用于运行它的docker composer文件。

编辑-处理器配置:


spring.cloud.stream:
function:
definition: processExternalMessage
bindings:
processExternalMessage-in-0:
destination: external_messages
processExternalMessage-out-0:
destination: internal_messages
kafka:
bindings:
processExternalMessage-out-0:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
processExternalMessage-in-0:
consumer:
configuration:
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

生产者配置:


spring.cloud.stream:
function:
definition: externalMessageProducer
bindings:
externalMessageProducer-out-0:
destination: external_messages
kafka:
bindings:
externalMessageProducer-out-0:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer

value.serializer是一个平面配置属性名称。value不是带有serializer字段的嵌套对象,在YAML术语中

这就是为什么另一个似乎工作

同样值得指出的是,Kstreams使用serde属性,而不是直接使用的序列化程序

最新更新