从HDFS读取XML文件,用lxml.etree在Pyspark中解析



我已经使用lxml.etree用Python编写了一个解析器,现在我正试图在Hadoop集群上运行所述解析器。当我在本地运行该函数时,它按预期工作,但当我试图将其应用于集群上的文件时,我收到了以下错误(我在Pyspark shell、python3中执行以下操作(

xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)
OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity 
"/file_path/date_directory/example_one.xml"

当我在终端中运行hdfs dfs -ls /file_path/date_directory/example_one.xml时,我可以看到该文件。

两个领域我会感谢帮助-

  1. 如何使用Pyspark将XML文件从集群加载到lxml.etree.parse((方法中
  2. 如何最好地扩展它以在Spark上有效运行?我想使用我的Python解析器在集群上解析数百万个XML文件——下面的修改是否有效,或者有更好的方法来大规模并行化和运行解析器吗?一般来说,我应该如何在我的火花配置中设置参数以获得最佳结果(大量执行者、多个驾驶员等(
#Same as above but with wildcards to parse millions of XML files
xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)

我已经为此工作了一段时间,非常感谢任何帮助。感谢你们

  1. mapValues((函数被证明是有用的。Sark配置的XML解析器,如Pubmed解析器,也提供了有用的样板代码,如以下代码:
path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
pubmed_oa_df = parse_results_rdd.toDF()
pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
'file_name', 'pmc', 'pmid',
'publication_year', 'publisher_id',
'journal', 'subjects']]
pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
mode='overwrite')

https://github.com/titipata/pubmed_parser/blob/master/scripts/pubmed_oa_spark.py

  1. 使用fs.globStatus可以检索一个子目录中的多个XML文件

最新更新