CSV文件从HDFS到Oracle BLOB,使用Spark



我正在开发使用Spark 2.3.1将数据从Oracle加载到HDFS的Java应用程序,反之亦然。 我想在HDFS中创建CSV文件,然后将其加载到Oracle (12.2( BLOB。

代码..

//create Dataset
Dataset<Row> dataset = SparkService.sql("select * from test_table");
String trgtFileWithPath = "/tmp/test_table.csv";      
//save file in HDFS
dataset.write().mode("overwrite").format("csv").save(trgtFileWithPath);
//get file from HDFS
JavaSparkContext jsc = SparkContextUtil.getJavaSparkContext("appId");
JavaRDD<String> textFile = jsc.textFile(trgtFileWithPath);
//Call Oracle package, that inserts into table with BLOB field
File csvFile = new File("/tmp/ETLFramework/test_table1.csv");
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(csvFile), 500);
Connection conn = tbl.getJdbcConnection(); //there is tbl var with java.sql.Connection
CallableStatement cstmt = conn.prepareCall(String.format("{call %s(?, ?, ?)}", "ORACLE_API_FOR_ETL_FRAMEWORK.INSERT_LOB"));
cstmt.setString(1, "FILE_TO_LOB");
cstmt.setString(2, "/tmp/test_table.csv");
cstmt.setClob(3, bis, (int) csvFile.length());
cstmt.execute();
if (!conn.getAutoCommit()) {
conn.commit();
}

我是Spark的新手......所以任何想法请如何将JavaRDD转换为BufferedInputStream,或者摆脱上面的混乱,并以更理智的方式将数据集放入Oracle BLOB。

谢谢

最后,在与Oracle,Hadoop和Spark进行了几天的斗争之后,我找到了适合我的任务的解决方案:

try {
String trgtFolderPath = "tmp/ETLFramework/csv/form_name";
Configuration conf = new Configuration();
String hdfsUri = "hdfs://" + /*nameNode*/ + ":" + /*hdfsPort*/;
FileSystem fileSystem = FileSystem.get(URI.create(hdfsUri), conf);
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem.listFiles(new Path(trgtFolderPath), true);
while(fileStatusListIterator.hasNext()){
LocatedFileStatus fileStatus = fileStatusListIterator.next();
String fileName = fileStatus.getPath().getName();
if (fileName.contains(".csv") && fileStatus.getLen()>0) {
log.info("fileName=" + fileName);
log.info("fileStatus.getLen=" + fileStatus.getLen());
BufferedInputStream bis = new BufferedInputStream(fileSystem.open(new Path(trgtFolderPath + "/" + fileName)), 500);
ETLParams param = ETLParams.getParams();
Connection conn = tbl.getJdbcConnection();
String apiPackageInsertLOB = ETLService.replaceParams(tbl.getConnection().getFullSchema() + "." + tbl.getApiPackage().getDbTableApiPackageInsertLOB(), param.getParamsByName());
log.info(String.format("Call %s(%s, %s, %s);", apiPackageInsertLOB, tbl.getFullTableName(), trgtFolderPath + "/" + fileName, "p_nInsertedRows"));
CallableStatement cstmt = conn.prepareCall(String.format("{call %s(?, ?, ?, ?)}", apiPackageInsertLOB));
cstmt.setString(1, tbl.getFullTableName());
cstmt.setString(2, trgtFolderPath + "/" + fileName);
cstmt.setBlob(3, bis, fileStatus.getLen());
cstmt.registerOutParameter(4, Types.INTEGER);
cstmt.execute();
int rowsInsertedCount = cstmt.getInt(3);
log.info("Inserted " + rowsInsertedCount + " rows into table blob_file");
cstmt.close();
}
}
fileSystem.close();
}
catch (IOException |
SQLException exc){
exc.printStackTrace();
}

将 Spark 数据集中的 2 Gb CSV 写入 HDFS,然后将此 CSV 从 HDFS 读取到 Oracle BLOB 中大约需要 5 分钟。

最新更新