我有一个像这样的Map列表,
List<Map<String, Object>> myList = new ArrayList<>();
Map<String, Object> mp1 = new HashMap<>();
mp1.put("id", 1);
mp1.put("name", "John");
Map<String, Object> mp2 = new HashMap<>();
mp2.put("id", 2);
mp2.put("name", "Carte");
键-值我们放入映射中的对不是固定的,我们可以使用任意动态键值双(动态模式)。
我想把它转换成spark数据框架。行(Dataset<<strong>在) .
+--+--------+
|
+--+--------+<约翰br />| 1 | |
+--+--------+
| 2 | |
+--+--------+
如何做到这一点?
注意:正如我所说,键值对是动态的,我不能提前创建java bean并使用下面的语法。
Dataset<Row> ds = spark.createDataFrame(myList, MyClass.class);
您可以从映射列表中构建行和模式,然后使用spark.createDataFrame(rows: java.util.List[Row], schema: StructType)
构建您的数据框架:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.*;
...
public static Dataset<Row> buildDataframe(List<Map<String, Object>> listOfMaps, SparkSession spark) {
// extract columns name list
Set<String> columnSet = new HashSet<>();
for (Map<String, Object> elem: listOfMaps) {
columnSet.addAll(elem.keySet());
}
List<String> columns = new ArrayList<>(columnSet);
// build rows
List<Row> rows = new ArrayList<>();
for (Map<String, Object> elem : listOfMaps) {
List<Object> row = new ArrayList<>();
for (String key: columns) {
row.add(elem.get(key));
}
rows.add(new GenericRow(row.toArray()));
}
// build schema
List<StructField> fields = new ArrayList<>();
for (String column: columns) {
fields.add(new StructField(column, getDataType(column, listOfMaps), true, Metadata.empty()));
}
StructType schema = new StructType(fields.toArray(new StructField[0]));
// build dataframe from rows and schema
return spark.createDataFrame(rows, schema);
}
public static DataType getDataType(String column, List<Map<String, Object>> data) {
for (Map<String, Object> elem : data) {
if (elem.get(column) != null) {
return getDataType(elem.get(column));
}
}
return DataTypes.NullType;
}
public static DataType getDataType(Object value) {
if (value.getClass() == Integer.class) {
return DataTypes.IntegerType;
} else if (value.getClass() == String.class) {
return DataTypes.StringType;
// TODO add all other spark types (Long, Timestamp, etc...)
} else {
throw new IllegalArgumentException("unknown type for value " + value);
}
}