如何在hadoopmapreduce中将任务状态更改为failed on exception



Im使用hadoop mapreduce执行以下任务:

我的映射程序将从HDFS读取.xml文件,并将其传递给某个服务。我的setup():中有一些类似的代码

try
{
    System.out.println(propertyName);
    session = FindPath.createSession("localhost",3250, EncodingConstants.en_ISO_8859_1);
    session.open();
}
catch 
{
    System.out.println("error");
}

当数据节点没有FindPath[FindPath-服务尚未启动]时,它将引发异常。

现在我的问题是,我的映射程序没有移动到具有正在运行的FindPath服务的另一个集群。

示例

数据节点1数据节点2-两个数据节点

如果DataNode 1尚未启动FindPath服务,则Input应移至DataNode 2。

我怎样才能做到这一点?

当Datanode 1中的TaskStatus抛出异常时,我如何将其更改为"Failed"

更新

Job j;
catch(Exception Ex)
{   
    j.failTask((TaskAttemptID)context.getTaskAttemptID());
    System.out.println("error");
}

我使用过类似的东西,但它抛出了一个NullPointer异常。

如何在我的映射器或映射器设置新API中使用failTask

使用JobClient访问RunningJob类(我有1.0.4 API)。

所以代码看起来是这样的:

在你的setup()中有一个JobClientRunningJob参考。

方法如下:

public void setup(Context context)
{
    JobClient jobClient;
    RunningJob runningJob;
    try 
    {
        jobClient = new JobClient((JobConf)context.getConfiguration());
        runningJob = jobClient.getJob((JobID)(context.getJobId()); //mapred.JobID!
    }
    catch (IOException e)
    {
        System.out.println("IO Exception");
    }

    try
    {
        System.out.println(propertyName);
        session = FindPath.createSession("localhost",3250, EncodingConstants.en_ISO_8859_1);
        session.open();
    }
    catch 
    {
        System.out.println("error");
        runningJob.killTask((TaskAttemptID)context.getTaskAttemptID(), true);// cast as mapred.TaskAttemptID
    }
}

这会导致TaskAttempt失败。

最后,您可能应该将mapred.map.max.attempts设置为1,这样失败的taskAttempt就是失败的任务。

注:

您应该考虑更改 mapred.max.map.failures.percent,因为它反映了集群对失败任务的容忍度。

相关内容

  • 没有找到相关文章

最新更新