在Apache Beam [DataFlow]中读取JdbcIO的空指针异常



我已经写了代码从Oracle读取数据使用Apache Beam与Dataflow

我面对一个奇怪的错误为几个表,而摄取数据。有各种各样的表,有200多个列,不同的数据类型,如Date,Number,VARCHAR2(***)在Oracle。

我们的要求是通过数据流迁移所有列BigQuery能够。当我们在select Query中选择所有列时,它会给出下面提到的Null Pointer exception错误。所以我们尝试在查询中使用选择性列但是在这种情况下

  • 当所选列的集合数据类型大小小于~46752 Bytes时,Pipeline运行成功。
  • ,当它超过这个限制,然后它开始给出一个空指针错误。

如果我更详细地解释,如果有2000字节的限制(假设,我们得到的实际值大约是46752字节),那么我们将只能选择两列VARCHAR2(1000)数据类型或4列VARCHAR2(500)数据类型,等等。

注意—阈值限制~46752字节,我们通过在Query中逐个添加列并执行代码计算得出。

我们不确定Apache Beam的Java JDBC连接器是否有这样的限制,但是当我们选择的列超过这个限制时,我们在迁移时面临挑战。

请帮助我,如果我错过了任何点在这里或任何参数,而通过JdbcIO读取数据。

下面是代码给出错误的代码片段。它是我们管道的入口点。代码在只读时给出错误。在下面的代码中,我没有提到BigQuery的写操作,因为它没有被执行,因为JdbcIO.read()只有失败。(通过注释BigQuery的逻辑来交叉检查)

从Oracle代码中读取数据

// Read from JDBC
Pipeline p2 = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
String query2 = "SELECT Col1...Col00 FROM table WHERE rownum<=1000";
PCollection<TableRow> rows = p2.apply(JdbcIO.<TableRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"oracle.jdbc.OracleDriver", "jdbc:oracle:thin:@//localhost:1521/orcl")
.withUsername("root")
.withPassword("password"))
.withQuery(query2)
.withRowMapper(new JdbcIO.RowMapper<TableRow>() {
@Override
public TableRow mapRow(ResultSet resultSet) throws Exception {
schema = getSchemaFromResultSet(resultSet);
TableRow tableRow = new TableRow();
List<TableFieldSchema> columnNames = schema.getFields();    
}
return tableRow;
}
)
);
p2.run().waitUntilFinish();

错误(仅当超过列中字节的限制时)

Error message from worker: java.lang.NullPointerException
oracle.sql.converter.CharacterConverter1Byte.toUnicodeChars(CharacterConverter1Byte.java:344) 
oracle.sql.CharacterSet1Byte.toCharWithReplacement(CharacterSet1Byte.java:134)   
oracle.jdbc.driver.DBConversion._CHARBytesToJavaChars(DBConversion.java:964) 
oracle.jdbc.driver.DBConversion.CHARBytesToJavaChars(DBConversion.java:867)  
oracle.jdbc.driver.T4CVarcharAccessor.unmarshalOneRow(T4CVarcharAccessor.java:298)  
oracle.jdbc.driver.T4CTTIrxd.unmarshal(T4CTTIrxd.java:934)     
oracle.jdbc.driver.T4CTTIrxd.unmarshal(T4CTTIrxd.java:853)     
oracle.jdbc.driver.T4C8Oall.readRXD(T4C8Oall.java:699)     
oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:337)     
oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:191)     
oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:523)     
oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:207)     
oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:863)     
oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1153)     
oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1275)     
oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3576)     
oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3620)     
oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java
:1491)     
org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java
:122)     
org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java
:122)     org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1381)

  • Oracle Version - 11g
  • Apache Beam SDK - Java
  • 有一些列,我们选择有空值也但是这应该不会引起任何问题。
  • 没有特定列的问题,因为我已经尝试了所有可能的列组合。
  • 没有列数选择限制,因为我能够从列数大于300但总字节大小小于46752字节的Oracle表中读取数据。

jdbc源转换有一个名为FetchSize的属性,该属性定义了可以从数据库获取多少数据。缺省值是50000字节。我们可以使用withFetchSize(int fetchSize)方法更改运行时值。Docs .

相关内容

  • 没有找到相关文章

最新更新