我以parquet
格式从S3读取数据,然后将此数据作为DataFrame
处理。问题是如何有效地迭代行在DataFrame
?我知道collect
方法将数据加载到内存中,因此,尽管我的DataFrame
不大,但我宁愿避免将完整的数据集加载到内存中。我如何优化给定的代码?此外,我正在使用索引访问DataFrame
中的列。我可以通过列名访问它们吗(我知道它们)?
DataFrame parquetFile = sqlContext.read().parquet("s3n://"+this.aws_bucket+"/"+this.aws_key_members);
parquetFile.registerTempTable("mydata");
DataFrame eventsRaw = sqlContext.sql("SELECT * FROM mydata");
Row[] rddRows = eventsRaw.collect();
for (int rowIdx = 0; rowIdx < rddRows.length; ++rowIdx)
{
Map<String, String> props = new HashMap<>();
props.put("field1", rddRows[rowIdx].get(0).toString());
props.put("field2", rddRows[rowIdx].get(1).toString());
// further processing
}
您可以在spark中使用Map
功能。您可以迭代整个数据帧而不收集dataset/dataframe:
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age
BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map((MapFunction<Row, String>) row -> "Name:" + row.getString(0),Encoders.STRING());
namesDS.show();
如果操作比较复杂,可以创建map函数:
// Map function
Row doSomething(Row row){
// get column
String field = row.getAs(COLUMN)
// construct a new row and add all the existing/modified columns in the row .
return row.
}
现在这个map函数可以被调用到dataframe的map函数中:
StructType structType = dataset.schema();
namesDF.map((MapFunction<Row, Row>)dosomething,
RowEncoder.apply(structType))
来源:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html