我遇到了一个问题,我不确定Spark dataframe是问题还是Spark - XML,我用它来解析XML文件到Spark。我将非常感谢你的帮助。
因此,我有以下XML:<root>
<path>
<to>
<atag>
<atag_number>1</atag_number>
<more>
<again>
<text>1111</text>
</again>
</more>
<more>
<again>
<text>2222</text>
</again>
</more>
<more>
<again>
<text>3333</text>
</again>
</more>
</atag>
<atag>
<atag_number>2</atag_number>
<more>
<again>
<text>4444</text>
</again>
</more>
<more>
<again>
<text>5555</text>
</again>
</more>
<more>
<again>
<text>6666</text>
</again>
</more>
</atag>
</to>
</path>
</root>
,我想得到一个包含path.to.atag.more.again.text
的表。我希望它们是原子的,所以它需要被爆炸以获得每个text
值的一行。
如果我选择例如path.to.atag[0].more.again.text
,我得到一个列表['1111','2222','3333']。但是如果我想从文件中获得所有的a标签,那么如果我选择path.to.atag.more.again.text
,我会得到一个错误,说:
Traceback (most recent call last):
File "...spark-2.0.1-bin-hadoop2.7pythonpysparksqlutils.py", line 63, in deco
return f(*a, **kw)
File "...spark-2.0.1-bin-hadoop2.7pythonlibpy4j-0.10.3-src.zippy4jprotocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.selectExpr.
: org.apache.spark.sql.AnalysisException: No such struct field text in again; line 1 pos 0
at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:85)
at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:253)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:252)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:252)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:148)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:604)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:604)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:604)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:600)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:191)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:201)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:205)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:205)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$5.apply(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:600)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:542)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:542)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:479)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)
at org.apache.spark.sql.Dataset.select(Dataset.scala:969)
at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1004)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "...MyModule.py", line 67, in <module>
df_output = df.selectExpr('path.to.atag.more.again.text')
File "...spark-2.0.1-bin-hadoop2.7pythonpysparksqldataframe.py", line 875, in selectExpr
jdf = self._jdf.selectExpr(self._jseq(expr))
File "...spark-2.0.1-bin-hadoop2.7pythonlibpy4j-0.10.3-src.zippy4jjava_gateway.py", line 1133, in __call__
File "...spark-2.0.1-bin-hadoop2.7pythonpysparksqlutils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'No such struct field text in again; line 1 pos 0'
你也应该爆炸atag
,例如:
atags = df.select(explode(df.path.to.atag))
atags.select(explode(atags.col.more.again.text))
上面的代码片段将为您提供6行DF -每个标签
各一行编辑:如果你有不同模式的xml文件,使用spark dataframe不是最好的解决方案(dataframe被设计成与具有相同模式的文件一起工作)。如果你正在寻找文件中的特定标签,你可以尝试纯RDD API,用DOM分析文件:
>>> from xml.dom.minidom import parseString
>>> def get_tags(xml, tag_name):
... return [d.childNodes[0].data for d in parseString(xml).getElementsByTagName(tag_name)]
...
>>> sc.wholeTextFiles('xmls').flatMap(lambda file: get_tags(file[1], "text")).collect()
[u'1111', u'2222', u'3333', u'4444', u'5555', u'6666']