将包含无效字符的嵌套字段从Spark 2导出到Parquet



我正在尝试使用spark 2.0.2将JSON文件转换为镶木地板。

  • JSON文件来自外部源,因此在到达之前无法更改模式
  • 该文件包含属性映射。在我收到文件之前,属性名称是未知的
  • 属性名称包含不能在镶木地板中使用的字符
{
"id" : 1,
"name" : "test",
"attributes" : {
"name=attribute" : 10,
"name=attribute with space" : 100,
"name=something else" : 10
}
}

空格和等号字符都不能在镶木地板中使用,我得到以下错误:

org.apache.spark.sql.AnalysisException:属性名称"name=Attribute"在",;{}()\n\t="中包含无效字符。请使用别名对其进行重命名
  • 由于这些是嵌套字段,我无法使用别名重命名它们,这是真的吗
  • 我已经尝试按照以下建议重命名模式中的字段:如何重命名与嵌套JSON对应的DataFrame中的字段。这适用于某些文件,但是,我现在得到以下stackoverflow:
java.lang.StackOverflowError在scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)网址:org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:258)在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheddler$$getPreferredLocsInternal(DAGSchedler.scala:1563)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply$mcVI$sp(DAGScheudler.scala:1579)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)位于scala.collection.invariable.List.foreach(List.scala:381)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1576)位于scala.collection.invariable.List.foreach(List.scala:381)在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheddler$$getPreferredLocsInternal(DAGSchedler.scala:1576)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply$mcVI$sp(DAGScheudler.scala:1579)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfon$apply$1.apply(DAGScheudler.scala:1578)位于scala.collection.invariable.List.foreach(List.scala:381)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1578)在org.apache.spark.scheduler.DAGScheduler$$anonfun$org.apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheudler.scala:1576)位于scala.collection.invariable.List.foreach(List.scala:381)。。。重复…

我想做以下操作之一:

  • 在将数据加载到spark中时,从字段名称中删除无效字符
  • 在不导致堆栈溢出的情况下更改架构中的列名
  • 以某种方式更改模式以加载原始数据,但在内部使用以下内容:
{
"id" : 1,
"name" : "test",
"attributes" : [
{"key":"name=attribute", "value" : 10},
{"key":"name=attribute with space", "value"  : 100},
{"key":"name=something else", "value" : 10}
]
}

我这样解决了问题:

df.toDF(df
.schema
.fieldNames
.map(name => "[ ,;{}()\n\t=]+".r.replaceAllIn(name, "_")): _*)

其中我用"_"替换了所有不正确的符号。

到目前为止,我找到的唯一可行的解决方案是用修改后的模式重新加载数据。新模式将把属性加载到映射中。

Dataset<Row> newData = sql.read().json(path);
StructType newSchema = (StructType) toMapType(newData.schema(), null, "attributes");
newData = sql.read().schema(newSchema).json(path);

private DataType toMapType(DataType dataType, String fullColName, String col) {
if (dataType instanceof StructType) {
StructType structType = (StructType) dataType;
List<StructField> renamed = Arrays.stream(structType.fields()).map(
f -> toMapType(f, fullColName == null ? f.name() : fullColName + "." + f.name(), col)).collect(Collectors.toList());
return new StructType(renamed.toArray(new StructField[renamed.size()]));
}
return dataType;
}
private StructField toMapType(StructField structField, String fullColName, String col) {
if (fullColName.equals(col)) {
return new StructField(col, new MapType(DataTypes.StringType, DataTypes.LongType, true), true, Metadata.empty());
} else if (col.startsWith(fullColName)) {
return new StructField(structField.name(), toMapType(structField.dataType(), fullColName, col), structField.nullable(), structField.metadata());
}
return structField;
}

我对@:也有同样的问题。

在我们的案例中,我们解决了讨好DataFrame的问题。

val ALIAS_RE: Regex = "[_.:@]+".r
val FIRST_AT_RE: Regex = "^_".r
def getFieldAlias(field_name: String): String = {
FIRST_AT_RE.replaceAllIn(ALIAS_RE.replaceAllIn(field_name, "_"), "")
}
def selectFields(df: DataFrame, fields: List[String]): DataFrame = {
var fields_to_select = List[Column]()
for (field <- fields) {
val alias = getFieldAlias(field)
fields_to_select +:= col(field).alias(alias)
}
df.select(fields_to_select: _*)
}

所以下面的json:

{ 
object: 'blabla',
schema: {
@type: 'blabla',
name@id: 'blabla'
}
}

它将被转换为[object,schema.@type,schema.name@id]。@dots(在您的情况下=)将给SparkSQL带来问题。

因此,在我们的SelectFields之后,您可以以[对象,schema_type,schema_name_id]。压扁的DataFrame。

相关内容

  • 没有找到相关文章

最新更新