在Spark DataFrame中找不到嵌套列的路径



我遇到了一个问题,我不确定Spark dataframe是问题还是Spark - XML,我用它来解析XML文件到Spark。我将非常感谢你的帮助。

因此,我有以下XML:
<root>
  <path>
    <to>
      <atag>
        <atag_number>1</atag_number>
        <more>
          <again>
            <text>1111</text>
          </again>
        </more>
        <more>
          <again>
            <text>2222</text>
          </again>
        </more>
        <more>
          <again>
            <text>3333</text>
          </again>
        </more>
      </atag>
      <atag>
        <atag_number>2</atag_number>
        <more>
          <again>
            <text>4444</text>
          </again>
        </more>
        <more>
          <again>
            <text>5555</text>
          </again>
        </more>
        <more>
          <again>
            <text>6666</text>
          </again>
        </more>
      </atag>
    </to>
  </path>
</root>

,我想得到一个包含path.to.atag.more.again.text的表。我希望它们是原子的,所以它需要被爆炸以获得每个text值的一行。

如果我选择例如path.to.atag[0].more.again.text,我得到一个列表['1111','2222','3333']。但是如果我想从文件中获得所有的a标签,那么如果我选择path.to.atag.more.again.text,我会得到一个错误,说:

Traceback (most recent call last):
  File "...spark-2.0.1-bin-hadoop2.7pythonpysparksqlutils.py", line 63, in deco
    return f(*a, **kw)
  File "...spark-2.0.1-bin-hadoop2.7pythonlibpy4j-0.10.3-src.zippy4jprotocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.selectExpr.
: org.apache.spark.sql.AnalysisException: No such struct field text in again; line 1 pos 0
    at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:85)
    at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:58)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:253)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:252)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:252)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:148)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:604)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:604)
    at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:604)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:600)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:191)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:201)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:205)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:205)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$5.apply(QueryPlan.scala:210)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:210)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:600)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:542)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:542)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:479)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:65)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:969)
    at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1004)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Unknown Source)

During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "...MyModule.py", line 67, in <module>
    df_output = df.selectExpr('path.to.atag.more.again.text')
  File "...spark-2.0.1-bin-hadoop2.7pythonpysparksqldataframe.py", line 875, in selectExpr
    jdf = self._jdf.selectExpr(self._jseq(expr))
  File "...spark-2.0.1-bin-hadoop2.7pythonlibpy4j-0.10.3-src.zippy4jjava_gateway.py", line 1133, in __call__
  File "...spark-2.0.1-bin-hadoop2.7pythonpysparksqlutils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'No such struct field text in again; line 1 pos 0'

你也应该爆炸atag,例如:

atags = df.select(explode(df.path.to.atag))
atags.select(explode(atags.col.more.again.text))

上面的代码片段将为您提供6行DF -每个标签

各一行编辑:

如果你有不同模式的xml文件,使用spark dataframe不是最好的解决方案(dataframe被设计成与具有相同模式的文件一起工作)。如果你正在寻找文件中的特定标签,你可以尝试纯RDD API,用DOM分析文件:

>>> from xml.dom.minidom import parseString
>>> def get_tags(xml, tag_name):
...   return [d.childNodes[0].data for d in parseString(xml).getElementsByTagName(tag_name)]
... 
>>> sc.wholeTextFiles('xmls').flatMap(lambda file: get_tags(file[1], "text")).collect()
[u'1111', u'2222', u'3333', u'4444', u'5555', u'6666']

相关内容

  • 没有找到相关文章