Kafka 架构注册表收到错误意外字符("<"(代码 60)):预期有效值(数字、字符串、数组、对象、"真"、"假")



我正在尝试将SpringBoot Application与Kafka Schema Registry集成。我创建了一个 kafkaPrducer,它将在验证到模式注册表后向 Kafka 主题发送消息:

public class Producer {
@Value("${topic.name}")
private final String TOPIC;
private final KafkaTemplate<Integer, Data> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<Integer, Data> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendTestEvent(Data data) throws Exception {
System.out.println("started");
Integer key = data.getTestEventId();
this.kafkaTemplate.send(this.TOPIC,key,data);
}

我的应用程序属性文件

server.port=8084
topic.name=test-topic
server.servlet.context-path=/api/v1
spring.application.name=kafkatest
spring.kafka.bootstrap-servers=*************.com:9093
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol= SASL_SSL
spring.kafka.properties.security.krb5.config = file:/etc/krb5.conf
spring.kafka.properties.sasl.mechanism = GSSAPI
spring.kafka.properties.sasl.kerberos.service.name= kafka
spring.kafka.properties.sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required 
useTicketCache=false serviceName="kafka" storeKey=true principal="***************" useKeyTab=true 
keyTab="/home/api/config/kafkaclient.keytab";
spring.kafka.ssl.trust-store-location= file:/home/api/config/truststore.p12
spring.kafka.ssl.trust-store-password=*********************
spring.kafka.ssl.trust-store-type= PKCS12
spring.kafka.basic.auth.credentials.source=USER_INFO 
spring.kafka.basic.auth.user.info=<username>:<password>
spring.kafka.schema.registry.url=https://schema-registry-*****************/subjects/test- 
topic/versions/latest

但是我收到错误:

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
> 2020-06-28 22:24:59.047  INFO 2019 --- [nio-8084-exec-1] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2020-06-28 22:24:59.054  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT refresh thread started.
2020-06-28 22:24:59.065  INFO 2019 --- [ha**********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT valid starting at: Sun Jun 28 22:25:07 IST 2020
2020-06-28 22:24:59.067  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT expires: Mon Jun 29 08:25:07 IST 2020
2020-06-28 22:24:59.072  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT refresh sleeping until: Mon Jun 29 06:28:47 IST 2020
2020-06-28 22:24:59.307  WARN 2019 --- [nio-8084-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'security.krb5.config' was supplied but isn't a known config.
2020-06-28 22:24:59.334  INFO 2019 --- [nio-8084-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2020-06-28 22:24:59.340  INFO 2019 --- [nio-8084-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14************
2020-06-28 22:25:03.041  INFO 2019 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: GlUYY***********
2020-06-28 22:25:04.994 ERROR 2019 --- [nio-8084-exec-1] o.a.c.c.C.[.[.[.[dispatcherServlet]      : Servlet.service() for servlet [dispatcherServlet] in context with path [/api/v1] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Event","namespace":"com.*******.kafka.avro.event.sample","fields":[{"name":"event_envelope","type":{"type":"record","name":"EventEnvelope","fields":[{"name":"data","type":{"type":"record","name":"Data","fields":[{"name":"testEventId","type":"int"},{"name":"test","type":{"type":"string","avro.java.string":"String"}}]},"default":{}}]}}]}] with root cause
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.0.jar!/:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.0.jar!/:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar!/:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar!/:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
at io.confluent.developer.spring.avro.Producer.sendTestEvent(Producer.java:58) ~[classes!/:0.0.1-SNAPSHOT]
at io.confluent.developer.spring.avro.KafkaController.sendMessageToKafkaTopic(KafkaController.java:31) ~[classes!/:0.0.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:836) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1747) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

由于我使用的是https模式注册表,我是否必须在application.properties中设置任何其他属性,如密钥库和trustore证书?

这是因为模式注册表已关闭。您需要在日志中验证确切的问题是什么。

用于查看架构注册表日志的命令:confluent local services schema-registry log -f(-f 使其上线( 就我而言,我有一个冲突的端口使用(另一个工具正在使用 8081(。日志会说话,只需要听到它。

在通过模式注册表进行验证后,我现在能够以 Avro 格式向 kafka 发布消息。

我犯的错误与架构的主题名称有关。默认情况下,Avro 架构使用TopicNameStrategy,这意味着它将查找看起来像此主题名称值的主题名称。如果未找到,它将引发错误,无法检索架构,因为该主题名称不存在架构。

我将架构的主题名称更改为测试主题,并能够将消息发布到测试主题上。

我希望这会有所帮助并节省某人的时间。

我遇到了同样的错误,在我看来,模式注册表不接受生产者和消费者推送的模式。

我在 CI 阶段注册了架构,并在代码中设置了以下属性:auto.register.schema=false

<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>

相关内容

最新更新