使用Flink从oracle读取数据



我正在尝试使用Flink与Oracle合作。只需执行一个简单的任务,将数据从表复制到新表。

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE ExistedTable(n" +
"    quoteid  BIGINT,n" +
"    requestid      BIGINT,n" +
"    createddt DATE,n" +           
"    PRIMARY KEY (quoteid) NOT ENFORCEDn" +
") WITH (n" +
"    'connector' = 'jdbc',n" +
"    'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',n" +
"    'table-name'    = 'TableName',n" +
"   'driver'     = 'oracle.jdbc.driver.OracleDriver',n" +
"    'username'    = 'UserName',n" +
"    'password'    = 'Password'n" +
")");
tEnv.executeSql("CREATE TABLE NewTable (n" +
"    quoteid  BIGINT,n" +
"    requestid      BIGINT,n" +
"    createddt DATE,n" +           
"    PRIMARY KEY (quoteid) NOT ENFORCEDn" +
") WITH (n" +
"    'connector' = 'jdbc',n" +
"    'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',n" +
"    'table-name'    = 'NewTableName',n" +
"   'driver'     = 'oracle.jdbc.driver.OracleDriver',n" +
"    'username'    = 'UserName',n" +
"    'password'    = 'Password'n" +
")");
Table data= tEnv.from("ExistedTable");
data.executeInsert("NewTable");

运行时出现错误

The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.xxx'.
Table options are:
'connector'='jdbc'
'driver'='oracle.jdbc.OracleDriver'
'password'='******'
'table-name'='xxx'
'url'='jdbc:oracle:thin:@xxx:1521:xxx'
'username'='xxx'
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.xxx'.

我的sqlconnection中有错误吗。我找不到任何与oracle合作的例子。谢谢,

您正在使用哪个版本的Flink?对OracleJDBC的支持是从Flink1.15开始提供的,它还没有发布。

最新更新