我已经写了代码从Oracle读取数据使用Apache Beam与Dataflow
我面对一个奇怪的错误为几个表,而摄取数据。有各种各样的表,有200多个列,不同的数据类型,如Date
,Number
,VARCHAR2(***)
在Oracle。
我们的要求是通过数据流迁移所有列BigQuery能够。当我们在select Query中选择所有列时,它会给出下面提到的Null Pointer exception
错误。所以我们尝试在查询中使用选择性列但是在这种情况下
- 当所选列的集合数据类型大小小于~46752 Bytes时,Pipeline运行成功。
- ,当它超过这个限制,然后它开始给出一个空指针错误。
如果我更详细地解释,如果有2000字节的限制(假设,我们得到的实际值大约是46752字节),那么我们将只能选择两列VARCHAR2(1000)数据类型或4列VARCHAR2(500)数据类型,等等。
注意—阈值限制~46752字节,我们通过在Query中逐个添加列并执行代码计算得出。
我们不确定Apache Beam的Java JDBC连接器是否有这样的限制,但是当我们选择的列超过这个限制时,我们在迁移时面临挑战。
请帮助我,如果我错过了任何点在这里或任何参数,而通过JdbcIO
读取数据。
下面是代码给出错误的代码片段。它是我们管道的入口点。代码在只读时给出错误。在下面的代码中,我没有提到BigQuery的写操作,因为它没有被执行,因为JdbcIO.read()
只有失败。(通过注释BigQuery的逻辑来交叉检查)
从Oracle代码中读取数据
// Read from JDBC
Pipeline p2 = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
String query2 = "SELECT Col1...Col00 FROM table WHERE rownum<=1000";
PCollection<TableRow> rows = p2.apply(JdbcIO.<TableRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"oracle.jdbc.OracleDriver", "jdbc:oracle:thin:@//localhost:1521/orcl")
.withUsername("root")
.withPassword("password"))
.withQuery(query2)
.withRowMapper(new JdbcIO.RowMapper<TableRow>() {
@Override
public TableRow mapRow(ResultSet resultSet) throws Exception {
schema = getSchemaFromResultSet(resultSet);
TableRow tableRow = new TableRow();
List<TableFieldSchema> columnNames = schema.getFields();
}
return tableRow;
}
)
);
p2.run().waitUntilFinish();
错误(仅当超过列中字节的限制时)
Error message from worker: java.lang.NullPointerException
oracle.sql.converter.CharacterConverter1Byte.toUnicodeChars(CharacterConverter1Byte.java:344)
oracle.sql.CharacterSet1Byte.toCharWithReplacement(CharacterSet1Byte.java:134)
oracle.jdbc.driver.DBConversion._CHARBytesToJavaChars(DBConversion.java:964)
oracle.jdbc.driver.DBConversion.CHARBytesToJavaChars(DBConversion.java:867)
oracle.jdbc.driver.T4CVarcharAccessor.unmarshalOneRow(T4CVarcharAccessor.java:298)
oracle.jdbc.driver.T4CTTIrxd.unmarshal(T4CTTIrxd.java:934)
oracle.jdbc.driver.T4CTTIrxd.unmarshal(T4CTTIrxd.java:853)
oracle.jdbc.driver.T4C8Oall.readRXD(T4C8Oall.java:699)
oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:337)
oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:191)
oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:523)
oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:207)
oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:863)
oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1153)
oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1275)
oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3576)
oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3620)
oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java
:1491)
org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java
:122)
org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java
:122) org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1381)
- Oracle Version - 11g Apache Beam SDK - Java
- 有一些列,我们选择有空值也但是这应该不会引起任何问题。
- 没有特定列的问题,因为我已经尝试了所有可能的列组合。
- 没有列数选择限制,因为我能够从列数大于300但总字节大小小于46752字节的Oracle表中读取数据。
jdbc源转换有一个名为FetchSize
的属性,该属性定义了可以从数据库获取多少数据。缺省值是50000字节。我们可以使用withFetchSize(int fetchSize)
方法更改运行时值。Docs .