Spark 将结果保存到 HDFS



我在HDFS上使用带有一个主节点和三个工作节点的Spark保存解析的XML文件时遇到了奇怪的行为,问题是

当我解析XMLFile并尝试保存在HDFS中时,文件无法与所有解析的结果一起保存。

当我通过指定

sc = SparkContext("local", "parser") 
and the spark-submit will be ./bin/spark-submit xml_parser.py

此运行在 HDFS 上提供 117MB 的解析文件,其中包含完整的记录。

如果在 spark-client 模式下执行代码,那么我做了以下操作,

 sc = SparkContext("spark://master:7077", "parser") 

而火花提交是,

./bin/spark-submit --master yarn-client --deploy-mode client --driver-memory 7g --executor-memory 4g  --executor-cores 2  xml_parser.py 1000

在HDFS上给我19MB的文件,记录不完整。

为了在两种情况下保存结果,我正在使用rdd.saveAsTextFile("hdfs://")

我正在使用Spark1.6.1-Hadoop2.6和 Apache Hadoop2.7.2

谁能帮我。我不明白为什么会发生这种情况。我有以下火花集群,

1 主 8 千兆字节内存

2-workerNode1 8GbRAM

3-workerNode2 8GbRAM

4-workerNode3 8GbRAM

我已经通过Hadoop-2.7.2配置了上述集群,具有1个主节点和3个数据节点,

如果我在 severNode 上给我,

24097 大师

21652 日元

23398 名称节点

23799资源管理器

23630 辅助名称节点

在所有数据节点上的 JPS,

8006 工人

7819节点管理器

27164 日元

7678 数据节点

通过检查HadoopNameNode ui master:9000给我三个实时数据节点

通过检查 master:7077 上的 SparkMaster Ui 给了我三个实时工作者

请看这里,

sc = SpakContext("spark://master:7077", "parser")
--------------------------------------------
 contains the logic of XMLParsing
--------------------------------------------
and I am appending the result in one list like,
cc_list.append([final_cll, Date,Time,int(cont[i]), float(values[i]),0])
Now I am Parallelizing the above cc_list like
 parallel_list = sc.parallelize(cc_list)
 parallel_list.saveAsTextFile("hdfs://master:9000/ some path")
 Now I am Doing some operations here.
 new_list = sc.textFile("hdfs://localhost:9000/some path/part-00000).map(lambda line:line.split(','))
 result = new_list.map(lambda x: (x[0]+',   '+x[3],float(x[4]))).sortByKey('true').coalesce(1)
 result = result.map(lambda x:x[0]+','+str(x[1]))
 result = result.map(lambda x: x.lstrip('[').rstrip(']').replace(' ','')).saveAsTextFile("hdfs://master:9000/some path1))

对不起,对于这里这样的愚蠢问题。 其实我发现了两个问题

1)在多个工作线程上运行时,

 parallel_list = sc.parallelize(cc_list) 

创建 4-5 个部件文件,parallel_list保存在 Hdfs 中,将 part-00000 转换为 part-00004,在加载parallel_list时,您可以在上面的代码中看到

new_list = sc.textFile(pathto parallel_list/part-00000) ==> so it was taking only the first part.

2)在本地模式下运行时,

 parallel_list = sc.parallelize(cc_list) was creating only one part file so i was able to pick whole file at one stroke.

因此,在与工人一起运行火花时,我想出了两个解决方案

1)我刚刚添加了part-*,同时从parallel_list创建new_list

2)通过传递 --conf spark.akka.frameSize=1000 和 spark 提交,将 spark.akka.frameSize 增加到 10000。

最新更新