我正在尝试在分布式模式下运行Kafka工作线程。与独立模式不同,在分布式模式下启动工作线程时,我们无法传递连接器属性文件。在分布式模式下,工作线程是单独启动的,我们使用 REST API 在这些工作线程上部署和管理连接器
参考链接 - https://docs.confluent.io/current/connect/managing/configuring.html#connect-managing-distributed-mode
我尝试通过在curl命令中传递以下值来构建连接器并执行它
curl -X POST -H "Content-Type: application/json" --data '{"name":"sailpointdb","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","connection.password " : " abc","connection.url " : "jdbc:mysql://localhost:3306/db","connection.user " : "abc" ,"query" : " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS
TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A","mode" : " timestamp","timestamp.column.name" : "TASKFAILEDON","topic.prefix" : "dbevents","validate.non.null" : "false" }}' http://localhost:8089/connectors/
我收到以下错误 - curl:(3( 使用错误/非法格式或缺少 URL 的 URL
请让我知道上面的 curl 语句有什么问题,我在这里错过了什么吗
- 您的 JSON 中有一个额外的右大括号,这无济于事
- 如果要
POST
/connectors
则需要name
和config
根级别元素。但是,我建议使用PUT
/config
,因为如果需要,您可以重新运行它以更新配置
试试这个:
curl -X PUT -H "Content-Type:application/json"
http://localhost:8089/connectors/source-jdbc-sailpointdb-00/config
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.password ": " abc",
"connection.url ": "jdbc:mysql://localhost:3306/db",
"connection.user ": "abc",
"query": " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A",
"mode": " timestamp",
"timestamp.column.name": "TASKFAILEDON",
"topic.prefix": "dbevents",
"validate.non.null": "false"
}'