我正在尝试使用 kafka connect 将数据下沉到 postgresql 中,但我收到模式不存在的错误。
包含点的主题名称是否有可能出现问题,因为错误提到模式"logstash"不存在,这是直到第一个点的字符串?
错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: org.postgresql.util.PSQLException: ERROR: schema "logstash" does not exist
Position: 14
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
... 10 more
Caused by: java.sql.SQLException: org.postgresql.util.PSQLException: ERROR: schema "logstash" does not exist
Position: 14
... 12 more
接收器配置:
{
"name": "jdbc.apache.access.log.sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "logstash.apache.access.log",
"connection.url": "jdbc:postgresql://<IP_OF_POSTGRESQL>:5432/kafka",
"connection.user": "kafka",
"connection.password": "<PASSWORD>",
"insert.mode": "upsert",
"pk.mode": "kafka",
"auto.create": true,
"auto.evolve": true,
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true"
}
}
架构(使用 API 调用(:
{
"subject": "logstash.apache.access.log-value",
"version": 3,
"id": 3,
"schema": "{"type":"record","name":"log","namespace":"value_logstash.apache.access","fields":[{"name":"clientip","type":["null","string"],"default":null},{"name":"verb","type":["null","string"],"default":null},{"name":"response","type":["null","string"],"default":null},{"name":"request","type":["null","string"],"default":null},{"name":"bytes","type":["null","string"],"default":null}]}"
}
编辑:
我尝试创建一个带有下划线的新主题。看起来这些点确实是导致错误的原因。有什么解决方案可以避免它还是我的配置有误......?
您应该能够使用RegexRouter
SMT 将主题重命名为在数据库写入的 Sink 操作之前没有句点的内容。