背景
首先,我想了解更多关于卡夫卡和卡夫卡之间的联系。本着这种精神,我正在关注米克尔·迈森和凯特·斯坦利的一本早期发行的书《卡夫卡连接》。
运行连接器
很早的时候(第2章-连接数据管道中的组件(,他们给出了一个"如何运行连接器"的例子。请注意,作者没有使用Confluent。在早期阶段,我们建议创建一个名为sink-config.json的文件,然后创建一个称为主题的主题以导出,并使用以下代码行:
bin/kafka-topics.sh --bootstrap-server localhost:9092
--create --replication-factor 1 --partitions 1 --topic topic-to-export
然后我们被指示";使用Connect REST API来启动具有您创建的配置的连接器;
$ curl -X PUT -H "Content-Type: application/json" http://localhost:8083/connectors/file-sink/config --data "@sink-config.json"
错误
然而,当我运行这个命令时,它会出现以下错误:
{"error_code":500,"message":"Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 36] (through reference chain: java.util.LinkedHashMap["config"])"}
正在尝试修复错误
请记住,我仍在努力学习Kafka和Kafka Connect,我做了一个非常简单的搜索,这让我在StackOverflow上看到了一篇帖子,似乎表明这应该是post而不是PUT。但是,将其更改为:
curl -d @sink-config.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors/file-sink/config
只会引发另一个错误:
{"error_code":405,"message":"HTTP 405 Method Not Allowed"}
我真的不确定从这里到哪里去,因为这"似乎"是你应该能够让连接器运行的方式。例如,Baeldung对连接器的介绍似乎也指定了这种做事方式。
有人知道发生了什么事吗?我不知道从哪里开始。。。
首先,感谢您查看本书的早期访问版本。
你在这个例子中发现了一个错误!
要启动连接器,建议使用PUT /connectors/file-sink/config
端点,但是我们提供的示例JSON不正确。
JSON文件应该类似于:
{
"name": "file-sink",
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"topics": "topic-to-export",
"file": "/tmp/sink.out",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
错误的出现是因为还有另一个端点可以用来启动连接器POST /connectors
,而我们提供的JSON就是用于该端点的。
我们建议您使用PUT /connectors/file-sink/config
,因为相同的端点也可以用于重新配置连接器。此外,相同的JSON文件也可以与PUT /connector-plugins/{connector-type}/config/validate
端点一起使用。
再次感谢您发现并报告错误,我们将在未来几周内修复此示例。我们还将很快回复您关于其他问题的电子邮件。