Hadoop Streaming Job 在 python 中失败



我有一个用Python编写的mapreduce作业。该程序在linux env中成功测试,但是当我在Hadoop下运行时失败了。

下面是作业命令:

hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1+169.127-streaming.jar 
   -input /data/omni/20110115/exp6-10122 -output /home/yan/visitorpy.out 
   -mapper SessionMap.py   -reducer  SessionRed.py  -file SessionMap.py 
   -file  SessionRed.py

会话*.py的模式为 755,#!/usr/bin/env python 是 *.py 文件中的第一行。 Mapper.py 是:

#!/usr/bin/env python
import sys
 for line in sys.stdin:
         val=line.split("t")
         (visidH,visidL,sessionID)=(val[4],val[5],val[108])
         print "%s%st%s" % (visidH,visidL,sessionID)

日志中的错误:

java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:260)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:110)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:126)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

我遇到了同样的问题,并且想知道,因为当我在测试数据上测试我的映射器和化简器时,它会运行。但是当我通过hadoop映射reduce运行相同的测试集时,我曾经遇到过同样的问题。

如何在本地测试代码:

cat <test file> | python mapper.py | sort | python reducer.py

经过进一步调查,我发现我的 mapper.py 脚本中没有包含"shebang line"。

#!/usr/bin/python

请将上面的行添加为python脚本的第一行,并在此之后留下一个空行。

如果你需要了解更多关于"shebang line"的信息,请阅读为什么人们在Python脚本的第一行写#!/usr/bin/env python?

您可以在Hadoop网页界面中找到python错误消息(例如回溯)和脚本编写到stdrr的其他内容。它有点隐藏,但您会在流媒体为您提供的链接中找到它。您单击"映射"或"减少",然后单击任何任务,然后在"全部"列中执行任务日志

最后我

修复了这个错误,这是我学到的教训。1)原始代码对错误数据没有错误处理。当我在一个小数据集上测试代码时,我没有发现问题。2)为了处理空字段/变量,我发现在Python中测试None和空字符串有点棘手。就我个人而言,我喜欢函数len(strVar),它易于阅读且有效。3)在这种情况下,hadoop命令是正确的。不知何故,模式为 644 的 *.py 可以在我使用的环境中成功运行。

Hadoop Streaming - Hadoop 1.0.x

我遇到了同样的"断管"问题。问题是我的化简器中的"中断"语句。所以,一切都很顺利,直到"休息"。之后,正在运行的减速器停止运行打印"断管"错误。此外,另一个减速器开始运行,与前一个有着相同的命运。这个圈子一直在继续。

如果我理解正确,当化简器开始从 stdin(这是我的情况,在 for 循环中)读取时,它必须读取所有内容。即使您像我尝试的那样关闭 stdin( os.close( 0),您也无法"中断"此操作)。

我今天在使用Hadoop 1.0.1时遇到了同样的问题。幸运的是,我已经通过以下方式解决了它:

Hadoop ... -mapper $cwd/mapper.py -reducer

$cwd/reducer.py ...

(我的 Python 脚本在当前目录中)。看起来现在绝对路径是必要的。

最好!

脏输入可能会导致此问题。

尝试使用 try{} 来避免这种情况。

#!/usr/bin/env python
import sys
for line in sys.stdin:
    try:
         val=line.split("t")
         (visidH,visidL,sessionID)=(val[4],val[5],val[108])
         print "%s%st%s" % (visidH,visidL,sessionID)
    except Exception  as e:
        pass

Python + Hadoop在某些不应该的细节上是棘手的。看看这里。

尝试将输入路径括在双引号中。(-输入 "/data/omni/20110115/exp6-10122")

一种可能的解决方案是包含"python",即:

-mapper  "python ./mapper.py"
-reducer "python ./reducer.py" 

相关内容

  • 没有找到相关文章

最新更新