Kafka JDBC Connect (Source and Sink) and Informix



有没有办法转换"; "以"!"在由JDBC连接器构造的查询中?

"SELECT * FROM database: user: table

连接器配置:

"name": "jdbc_source_connector",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:informix-sqli://IP:PORT/databasa:informixserver=oninit;user=user;password=password",
"topic.prefix" : "table-",
"poll.interval.ms" : "100000",
"mode" : "incrementing",
"table.whitelist" : "table",
"query.suffix" : ";",
"incrementing.column.name" : "lp"

错误:

[2021-02-03 14:03:02,809] INFO Begin using SQL query: SELECT * FROM  database  :  user  :  table  WHERE  database  :  user  :  table : lp  > ? ORDER B
Y  database  :  user  :  table : lp  ASC ; (io.confluent.connect.jdbc.source.TableQuerier:164)
[2021-02-03 14:03:02,853] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="database "." user "." table", query='null', top
icPrefix='database-', incrementingColumn='lp', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:404)
java.sql.SQLSyntaxErrorException: A syntax error has occurred.

连接器会话:

informix database:/usr/informix$ onstat -g ses 544271
IBM Informix Dynamic Server Version 12.10.FC13 -- On-Line -- Up 60 days 18:26:06 -- 4985440 Kbytes
session           effective                                               #RSAM    total      used       dynamic
id       user     user      tty      pid      hostname                    threads  memory     memory     explain
544271   user      -         -        1266352  kafkahost                  1        172032     102672     off
Program :
Thread[id:175, name:task-thread-jdbc_source_connector_database, path:/app/kafka_2.13-2.7.0/plugins/kafka-connect-jdbc-10.0.1/lib/jdbc-4.50.4.1.jar]
tid      name     rstcb            flags    curstk   status
583042   sqlexec  7000000437451a8  Y--P---  6224     cond wait  netnorm   -
Memory pools    count 2
name         class addr              totalsize  freesize   #allocfrag #freefrag
544271       V     70000004f6c7040  167936     68592      113        35
544271*O0    V     700000065ad0040  4096       768        1          1
name           free       used           name           free       used
overhead       0          6656           scb            0          144
opentable      0          11352          filetable      0          1040
log            0          16536          temprec        0          22688
keys           0          816            gentcb         0          1592
ostcb          0          3472           sqscb          0          25128
hashfiletab    0          552            osenv          0          2056
sqtcb          0          9336           fragman        0          640
sapi           0          144            udr            0          520
sqscb info
scb              sqscb            optofc   pdqpriority optcompind  directives
7000000343d3360  700000039194028  0        0           0           1
Sess       SQL            Current            Iso Lock       SQL  ISAM F.E.
Id         Stmt type      Database           Lvl Mode       ERR  ERR  Vers  Explain
544271     -              database                CR  Not Wait   0    0    9.28  Off
Last parsed SQL statement :
SELECT * FROM  database  :  user  :  table  WHERE  database  :  user  :  table :
lp  > ? ORDER BY  database  :  user  :  table : lp  ASC

所以,唯一的解决方法是将查询选项添加到连接器配置

修改后的config:

"name": "jdbc_source_connector",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:informix-sqli://IP:PORT/databasa:informixserver=oninit;user=user;password=password",
"topic.prefix" : "table-tablename",
"poll.interval.ms" : "100000",
"mode" : "incrementing",
"query" : "select * from table ",
"incrementing.column.name" : "lp"

最新更新