我想在ApacheFlink应用程序中使用JDBC连接器。但是maven并没有找到那个令人望而却步的JDBC包。我在";构建罐子";区段:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.13.1</version>
</dependency>
jar文件由maven下载,并在本地maven目录中可用。我的代码看起来像这样。
// standard, not relevant flink imports
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
public class BatchLayerExec {
public static void main( final String[] args ) {
//Definition of Strings for the connection to the database
try {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
final TypeInformation<?>[] fieldTypes =
new TypeInformation<?>[] { ... };
final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
//Define Input Format Builder
JDBCInputFormat.JDBCInputFormatBuilder inputBuilder = JDBCInputFormat
.buildJDBCInputFormat()
.setDrivername(driverName)
.setDBUrl(dbURL + sourceDB)
.setQuery(selectQuery)
.setRowTypeInfo(rowTypeInfo)
.setUsername(dbUser)
.setPassword(dbPassword)
.setRowTypeInfo(rowTypeInfo);
DataSet<Row> sourceTable = environment.createInput(inputBuilder.finish());
// Transformation
// ...
// Print for debugging
transformedTable.print();
// Output transformed data to output table
//Define Output Format Builder
JDBCOutputFormat.JDBCOutputFormatBuilder outputBuilder = JDBCOutputFormat
.buildJDBCOutputFormat()
.setDrivername(driverName)
.setDBUrl(dbURL + sourceDB)
.setQuery(insertQuery)
.setSqlTypes(new int[] { ... })
.setUsername(dbUser)
.setPassword(dbPassword);
//Define dataSink
transformedTable.output(outputBuilder.finish());
environment.execute();
} catch(final Exception e) {
System.out.println(e);
}
}
}
但在使用mvn clean package -Pbuild-jar
的构建过程中,我收到错误消息:CCD_ 2。我删除了代码中一些不相关的定义和步骤(请参阅注释(。如果您需要更多信息,请发表评论。
我发现包org.apache.flink.api.java.io.jdbc
已弃用。导入包org.apache.flink.connector.jdbc
有效
编辑
注意,这需要将JDBCInputFormat
和JDBCOutputFormat
类更改为JdbcInputFormat
和JdbcOutputFormat
。