Im使用hadoop mapreduce执行以下任务:
我的映射程序将从HDFS读取.xml
文件,并将其传递给某个服务。我的setup()
:中有一些类似的代码
try
{
System.out.println(propertyName);
session = FindPath.createSession("localhost",3250, EncodingConstants.en_ISO_8859_1);
session.open();
}
catch
{
System.out.println("error");
}
当数据节点没有FindPath
[FindPath-服务尚未启动]时,它将引发异常。
现在我的问题是,我的映射程序没有移动到具有正在运行的FindPath
服务的另一个集群。
示例:
数据节点1数据节点2-两个数据节点
如果DataNode 1尚未启动FindPath
服务,则Input应移至DataNode 2。
我怎样才能做到这一点?
当Datanode 1中的TaskStatus
抛出异常时,我如何将其更改为"Failed"
更新
Job j;
catch(Exception Ex)
{
j.failTask((TaskAttemptID)context.getTaskAttemptID());
System.out.println("error");
}
我使用过类似的东西,但它抛出了一个NullPointer异常。
如何在我的映射器或映射器设置新API中使用failTask
?
使用JobClient
访问RunningJob
类(我有1.0.4 API)。
所以代码看起来是这样的:
在你的setup()
中有一个JobClient
和RunningJob
参考。
方法如下:
public void setup(Context context)
{
JobClient jobClient;
RunningJob runningJob;
try
{
jobClient = new JobClient((JobConf)context.getConfiguration());
runningJob = jobClient.getJob((JobID)(context.getJobId()); //mapred.JobID!
}
catch (IOException e)
{
System.out.println("IO Exception");
}
try
{
System.out.println(propertyName);
session = FindPath.createSession("localhost",3250, EncodingConstants.en_ISO_8859_1);
session.open();
}
catch
{
System.out.println("error");
runningJob.killTask((TaskAttemptID)context.getTaskAttemptID(), true);// cast as mapred.TaskAttemptID
}
}
这会导致TaskAttempt失败。
最后,您可能应该将mapred.map.max.attempts
设置为1,这样失败的taskAttempt就是失败的任务。
注:
您应该考虑更改 mapred.max.map.failures.percent
,因为它反映了集群对失败任务的容忍度。