如何在 Java 中将映射列表<映射<字符串、字符串>> myList 的列表转换为 Spark 数据帧?



我有一个像这样的Map列表,

List<Map<String, Object>> myList = new ArrayList<>();
Map<String, Object> mp1 = new HashMap<>();
mp1.put("id", 1);
mp1.put("name", "John");
Map<String, Object> mp2 = new HashMap<>();
mp2.put("id", 2);
mp2.put("name", "Carte");

键-值我们放入映射中的对不是固定的,我们可以使用任意动态键值双(动态模式)。

我想把它转换成spark数据框架。行(Dataset<<strong>在) .

+--+--------+
|

您可以从映射列表中构建行和模式,然后使用spark.createDataFrame(rows: java.util.List[Row], schema: StructType)构建您的数据框架:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.*;
...
public static Dataset<Row> buildDataframe(List<Map<String, Object>> listOfMaps, SparkSession spark) {
// extract columns name list
Set<String> columnSet = new HashSet<>();
for (Map<String, Object> elem: listOfMaps) {
columnSet.addAll(elem.keySet());
}
List<String> columns = new ArrayList<>(columnSet);
// build rows
List<Row> rows = new ArrayList<>();
for (Map<String, Object> elem : listOfMaps) {
List<Object> row = new ArrayList<>();
for (String key: columns) {
row.add(elem.get(key));
}
rows.add(new GenericRow(row.toArray()));
}
// build schema
List<StructField> fields = new ArrayList<>();
for (String column: columns) {
fields.add(new StructField(column, getDataType(column, listOfMaps), true, Metadata.empty()));
}
StructType schema = new StructType(fields.toArray(new StructField[0]));
// build dataframe from rows and schema
return spark.createDataFrame(rows, schema);
}
public static DataType getDataType(String column, List<Map<String, Object>> data) {
for (Map<String, Object> elem : data) {
if (elem.get(column) != null) {
return getDataType(elem.get(column));
}
}
return DataTypes.NullType;
}
public static DataType getDataType(Object value) {
if (value.getClass() == Integer.class) {
return DataTypes.IntegerType;
} else if (value.getClass() == String.class) {
return DataTypes.StringType;
// TODO add all other spark types (Long, Timestamp, etc...)
} else {
throw new IllegalArgumentException("unknown type for value " + value);
}
}

相关内容

  • 没有找到相关文章

最新更新