在接收器连接器上反序列化 Avro 消息时出错



我在尝试从主题中使用 avro 消息并将它们转储到 postgres DB 时反序列化 Avro 消息时出错。

这是我的生产者配置:

spring
Kafka
producer:
bootstrap-servers: localhost:9092
key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081

接收器连接器配置:

{
"name": "jdbc_source_postgres_avro",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgres://localhost:5432/kafka-test",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.user": "postgres",
"connection.password": "password",
"topics": "docker-avro-topic",
"auto.create": "true",
"auto.offset.reset": "latest",
"name": "jdbc_source_postgres_avro"
}

用于连接的 docker 映像:

connect:
image: cnfldemos/kafka-connect-datagen:0.1.3-5.3.1
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

最后是错误日志:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
in error handler    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.kafka.connect.errors.DataException: Failed to deserialize
data for topic docker-avro-topic to Avro:   at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)...
13 more Caused by:
**org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 101 Caused by:
java.net.ConnectException: Connection refused (Connection refused)  at
java.net.PlainSocketImpl.socketConnect(Native Method)**     at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)   at
java.net.Socket.connect(Socket.java:589)    at
sun.net.NetworkClient.doConnect(NetworkClient.java:175)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:463)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:558)     at
sun.net.www.http.HttpClient.<init>(HttpClient.java:242)     at
sun.net.www.http.HttpClient.New(HttpClient.java:339)    at
sun.net.www.http.HttpClient.New(HttpClient.java:357     at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
at
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)

你已将 Kafka Connect 配置为查找在同一容器上运行的架构注册表:

"value.converter.schema.registry.url": "http://localhost:8081",

但事实并非如此,因此:

java.net.ConnectException: Connection refused (Connection refused)

由于您已经在 Kafka Connect docker 环境变量中指定了架构注册表配置,因此假设您确实在可以从 Kafka Connect 容器中解析为schema-registry的容器上运行架构注册表,您只需从实际的连接器配置中删除schema.registry.url配置行即可。

最新更新