我有一个用Python编写的mapreduce作业。该程序在linux env中成功测试,但是当我在Hadoop下运行时失败了。
下面是作业命令:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1+169.127-streaming.jar
-input /data/omni/20110115/exp6-10122 -output /home/yan/visitorpy.out
-mapper SessionMap.py -reducer SessionRed.py -file SessionMap.py
-file SessionRed.py
会话*.py的模式为 755,#!/usr/bin/env python
是 *.py 文件中的第一行。 Mapper.py 是:
#!/usr/bin/env python
import sys
for line in sys.stdin:
val=line.split("t")
(visidH,visidL,sessionID)=(val[4],val[5],val[108])
print "%s%st%s" % (visidH,visidL,sessionID)
日志中的错误:
java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:260)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:110)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:126)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
我遇到了同样的问题,并且想知道,因为当我在测试数据上测试我的映射器和化简器时,它会运行。但是当我通过hadoop映射reduce运行相同的测试集时,我曾经遇到过同样的问题。
如何在本地测试代码:
cat <test file> | python mapper.py | sort | python reducer.py
经过进一步调查,我发现我的 mapper.py 脚本中没有包含"shebang line"。
#!/usr/bin/python
请将上面的行添加为python脚本的第一行,并在此之后留下一个空行。
如果你需要了解更多关于"shebang line"的信息,请阅读为什么人们在Python脚本的第一行写#!/usr/bin/env python?
您可以在Hadoop网页界面中找到python错误消息(例如回溯)和脚本编写到stdrr的其他内容。它有点隐藏,但您会在流媒体为您提供的链接中找到它。您单击"映射"或"减少",然后单击任何任务,然后在"全部"列中执行任务日志
修复了这个错误,这是我学到的教训。1)原始代码对错误数据没有错误处理。当我在一个小数据集上测试代码时,我没有发现问题。2)为了处理空字段/变量,我发现在Python中测试None和空字符串有点棘手。就我个人而言,我喜欢函数len(strVar),它易于阅读且有效。3)在这种情况下,hadoop命令是正确的。不知何故,模式为 644 的 *.py 可以在我使用的环境中成功运行。
Hadoop Streaming - Hadoop 1.0.x
我遇到了同样的"断管"问题。问题是我的化简器中的"中断"语句。所以,一切都很顺利,直到"休息"。之后,正在运行的减速器停止运行打印"断管"错误。此外,另一个减速器开始运行,与前一个有着相同的命运。这个圈子一直在继续。
如果我理解正确,当化简器开始从 stdin(这是我的情况,在 for 循环中)读取时,它必须读取所有内容。即使您像我尝试的那样关闭 stdin( os.close( 0),您也无法"中断"此操作)。
我今天在使用Hadoop 1.0.1时遇到了同样的问题。幸运的是,我已经通过以下方式解决了它:
Hadoop ... -mapper $cwd/mapper.py -reducer$cwd/reducer.py ...
(我的 Python 脚本在当前目录中)。看起来现在绝对路径是必要的。
最好!
脏输入可能会导致此问题。
尝试使用 try{} 来避免这种情况。
#!/usr/bin/env python
import sys
for line in sys.stdin:
try:
val=line.split("t")
(visidH,visidL,sessionID)=(val[4],val[5],val[108])
print "%s%st%s" % (visidH,visidL,sessionID)
except Exception as e:
pass
Python + Hadoop在某些不应该的细节上是棘手的。看看这里。
尝试将输入路径括在双引号中。(-输入 "/data/omni/20110115/exp6-10122")
一种可能的解决方案是包含"python",即:
-mapper "python ./mapper.py"
-reducer "python ./reducer.py"