我正在尝试使用spark 2.0.2将JSON文件转换为镶木地板。
- JSON文件来自外部源,因此在到达之前无法更改模式
- 该文件包含属性映射。在我收到文件之前,属性名称是未知的
- 属性名称包含不能在镶木地板中使用的字符
{
"id" : 1,
"name" : "test",
"attributes" : {
"name=attribute" : 10,
"name=attribute with space" : 100,
"name=something else" : 10
}
}
空格和等号字符都不能在镶木地板中使用,我得到以下错误:
org.apache.spark.sql.AnalysisException:属性名称"name=Attribute"在",;{}()\n\t="中包含无效字符。请使用别名对其进行重命名
- 由于这些是嵌套字段,我无法使用别名重命名它们,这是真的吗
- 我已经尝试按照以下建议重命名模式中的字段:如何重命名与嵌套JSON对应的DataFrame中的字段。这适用于某些文件,但是,我现在得到以下stackoverflow:
java.lang.StackOverflowError在scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)网址:org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:258)在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheddler$$getPreferredLocsInternal(DAGSchedler.scala:1563)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply$mcVI$sp(DAGScheudler.scala:1579)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)位于scala.collection.invariable.List.foreach(List.scala:381)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1576)位于scala.collection.invariable.List.foreach(List.scala:381)在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheddler$$getPreferredLocsInternal(DAGSchedler.scala:1576)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply$mcVI$sp(DAGScheudler.scala:1579)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)位于scala.collection.invariable.List.foreach(List.scala:381)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1576)位于scala.collection.invariable.List.foreach(List.scala:381)。。。重复…
我想做以下操作之一:
- 在将数据加载到spark中时,从字段名称中删除无效字符
- 在不导致堆栈溢出的情况下更改架构中的列名
- 以某种方式更改模式以加载原始数据,但在内部使用以下内容:
{
"id" : 1,
"name" : "test",
"attributes" : [
{"key":"name=attribute", "value" : 10},
{"key":"name=attribute with space", "value" : 100},
{"key":"name=something else", "value" : 10}
]
}
我这样解决了问题:
df.toDF(df
.schema
.fieldNames
.map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)
其中我用"_"替换了所有不正确的符号。
到目前为止,我找到的唯一可行的解决方案是用修改后的模式重新加载数据。新模式将把属性加载到映射中。
Dataset<Row> newData = sql.read().json(path);
StructType newSchema = (StructType) toMapType(newData.schema(), null, "attributes");
newData = sql.read().schema(newSchema).json(path);
private DataType toMapType(DataType dataType, String fullColName, String col) {
if (dataType instanceof StructType) {
StructType structType = (StructType) dataType;
List<StructField> renamed = Arrays.stream(structType.fields()).map(
f -> toMapType(f, fullColName == null ? f.name() : fullColName + "." + f.name(), col)).collect(Collectors.toList());
return new StructType(renamed.toArray(new StructField[renamed.size()]));
}
return dataType;
}
private StructField toMapType(StructField structField, String fullColName, String col) {
if (fullColName.equals(col)) {
return new StructField(col, new MapType(DataTypes.StringType, DataTypes.LongType, true), true, Metadata.empty());
} else if (col.startsWith(fullColName)) {
return new StructField(structField.name(), toMapType(structField.dataType(), fullColName, col), structField.nullable(), structField.metadata());
}
return structField;
}
我对@:
也有同样的问题。
在我们的案例中,我们解决了讨好DataFrame的问题。
val ALIAS_RE: Regex = "[_.:@]+".r
val FIRST_AT_RE: Regex = "^_".r
def getFieldAlias(field_name: String): String = {
FIRST_AT_RE.replaceAllIn(ALIAS_RE.replaceAllIn(field_name, "_"), "")
}
def selectFields(df: DataFrame, fields: List[String]): DataFrame = {
var fields_to_select = List[Column]()
for (field <- fields) {
val alias = getFieldAlias(field)
fields_to_select +:= col(field).alias(alias)
}
df.select(fields_to_select: _*)
}
所以下面的json:
{
object: 'blabla',
schema: {
@type: 'blabla',
name@id: 'blabla'
}
}
它将被转换为[object,schema.@type,schema.name@id]。@和dots(在您的情况下=)将给SparkSQL带来问题。
因此,在我们的SelectFields之后,您可以以[对象,schema_type,schema_name_id]。压扁的DataFrame。