我正在使用 kafka 连接到从 DB2 到 kafka 主题的源数据,我正在配置 sql 查询以从 DB2 读取数据,下面是查询
SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'
我在这里使用设置"timestamp.column.name": "CREATE_TS"
问题在于查询中已经有WHERE
子句,Kafka Connect 试图添加另一个带有时间戳列的 where 子句,它正在产生问题,另一个问题是如果我从 SQL 子句中删除 where 子句,如下所示
SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR
然后我得到 substr 的错误,如下所示
SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26
任何人都可以在这两个问题上提出建议,我卡在这一点上.
发生这种情况是因为您尝试同时使用"mode": "timestamp"
和query
。TimestampIncrementingTableQuerier
将一个WHERE
与query
中的现有WHERE
子句冲突的查询追加到查询中
。JDBC 源连接器文档对此很清楚:
query
如果指定,则为选择新行或更新行而执行的查询。用 如果要联接表,请在 表或筛选数据。如果使用,此连接器将仅复制数据 使用此查询 -- 将禁用全表复制。不同 查询模式仍可用于增量更新,但为了 正确构造增量查询,必须能够 将 WHERE 子句附加到此查询(即没有 WHERE 子句 使用)。如果使用 WHERE 子句,它必须处理增量查询 本身。
作为一种解决方法,您可以将查询修改为(取决于您使用的 SQL 风格)
SELECT * FROM ( SELECT * FROM table WHERE ...)
或
WITH a AS
SELECT * FROM b
WHERE ...
SELECT * FROM a
例如,在您的情况下,查询应该是
"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"