confluent - kafka-connect - JDBC 源连接器 - ORA-00933:SQL 命令未正确结



我的kafka jdbc源连接器属性文件中有以下sql查询:

query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'

如果我在 sql 开发人员中运行相同的查询,它可以正常工作并获取结果。但是如果我在"jdbc_workflow_connect.properties"中使用相同的查询,则会出现以下错误:

(io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:223)
[2018-09-19 12:32:15,130] INFO WorkerSourceTask{id=Workflow-DB-source-0} 
Source task finished initialization and start 
(org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2018-09-19 12:32:15,328] ERROR Failed to run query for table 
TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM 
JENNY.WORKFLOW where ID = '565231'', topicPrefix='workflow_data1', 
timestampColumn='null', incrementingColumn='ID'}: {} 
(io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:776)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:897)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1034)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3867)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1502)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

这是我的 JDBC 源连接器属性文件内容:

name=Workflow-DB-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password = ******
connection.url = jdbc:oracle:thin:@1.1.1.1:****/****
connection.user = *****
table.types=TABLE
query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'
mode=incrementing
incrementing.column.name=ID
topic.prefix=workflow_data1
timestamp.delay.interval.ms=60000
transforms:createKey
transforms.createKey.type:org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields:ID

我正在使用 ojdbc7.jar

观察:

如果我删除"WHERE"子句,查询工作正常(如下所示(:

SELECT * FROM JENNY.WORKFLOW

如果我做错了什么或在 jdbc 源连接器中进行设置所需的任何修改,请告诉我。

提前谢谢。

从 JDBC Connect 配置选项的文档中,您可以阅读

如果指定,则为选择新行或更新行而执行的查询。如果要联接表、选择表中的列子集或筛选数据,请使用此设置。如果使用,此连接器将仅使用此查询复制数据 - 将禁用全表复制。增量更新仍可能使用不同的查询模式,但为了正确构造增量查询,必须能够将 WHERE 子句附加到此查询(即不能使用 WHERE 子句(。

因此,如果您真的只想考虑具有给定ID的表部分,则必须按如下方式包装查询

select * from (SELECT * FROM JENNY.WORKFLOW where ID = '565231')

但请确保您检查了配置选项的文档,并且知道query参数的作用。

相关内容

  • 没有找到相关文章

最新更新