我在HDFS上使用带有一个主节点和三个工作节点的Spark保存解析的XML文件时遇到了奇怪的行为,问题是
当我解析XMLFile并尝试保存在HDFS中时,文件无法与所有解析的结果一起保存。
当我通过指定
sc = SparkContext("local", "parser")
and the spark-submit will be ./bin/spark-submit xml_parser.py
此运行在 HDFS 上提供 117MB 的解析文件,其中包含完整的记录。
如果在 spark-client 模式下执行代码,那么我做了以下操作,
sc = SparkContext("spark://master:7077", "parser")
而火花提交是,
./bin/spark-submit --master yarn-client --deploy-mode client --driver-memory 7g --executor-memory 4g --executor-cores 2 xml_parser.py 1000
在HDFS上给我19MB的文件,记录不完整。
为了在两种情况下保存结果,我正在使用rdd.saveAsTextFile("hdfs://")
我正在使用Spark1.6.1-Hadoop2.6和 Apache Hadoop2.7.2
谁能帮我。我不明白为什么会发生这种情况。我有以下火花集群,
1 主 8 千兆字节内存
2-workerNode1 8GbRAM
3-workerNode2 8GbRAM
4-workerNode3 8GbRAM
我已经通过Hadoop-2.7.2配置了上述集群,具有1个主节点和3个数据节点,
如果我在 severNode 上给我,
24097 大师
21652 日元
23398 名称节点
23799资源管理器
23630 辅助名称节点
在所有数据节点上的 JPS,
8006 工人
7819节点管理器
27164 日元
7678 数据节点
通过检查HadoopNameNode ui master:9000给我三个实时数据节点
通过检查 master:7077 上的 SparkMaster Ui 给了我三个实时工作者
请看这里,
sc = SpakContext("spark://master:7077", "parser")
--------------------------------------------
contains the logic of XMLParsing
--------------------------------------------
and I am appending the result in one list like,
cc_list.append([final_cll, Date,Time,int(cont[i]), float(values[i]),0])
Now I am Parallelizing the above cc_list like
parallel_list = sc.parallelize(cc_list)
parallel_list.saveAsTextFile("hdfs://master:9000/ some path")
Now I am Doing some operations here.
new_list = sc.textFile("hdfs://localhost:9000/some path/part-00000).map(lambda line:line.split(','))
result = new_list.map(lambda x: (x[0]+', '+x[3],float(x[4]))).sortByKey('true').coalesce(1)
result = result.map(lambda x:x[0]+','+str(x[1]))
result = result.map(lambda x: x.lstrip('[').rstrip(']').replace(' ','')).saveAsTextFile("hdfs://master:9000/some path1))
对不起,对于这里这样的愚蠢问题。 其实我发现了两个问题
1)在多个工作线程上运行时,
parallel_list = sc.parallelize(cc_list)
创建 4-5 个部件文件,parallel_list保存在 Hdfs 中,将 part-00000 转换为 part-00004,在加载parallel_list时,您可以在上面的代码中看到
new_list = sc.textFile(pathto parallel_list/part-00000) ==> so it was taking only the first part.
2)在本地模式下运行时,
parallel_list = sc.parallelize(cc_list) was creating only one part file so i was able to pick whole file at one stroke.
因此,在与工人一起运行火花时,我想出了两个解决方案
1)我刚刚添加了part-*,同时从parallel_list创建new_list
2)通过传递 --conf spark.akka.frameSize=1000 和 spark 提交,将 spark.akka.frameSize 增加到 10000。